Re: Rename filter() into keep(), remove() or take() ?

2014-02-27 Thread Nick Pentreath
Agree that filter is perhaps unintuitive. Though the Scala collections API has 
filter and filterNot which together provide context that makes it more 
intuitive.


And yes the change could be via added methods that don't break existing API.


Still overall I would be -1 on this unless a significant proportion of users 
would find it added value.




Actually adding filterNot while not that necessary would make more sense in 
my view








—
Sent from Mailbox for iPhone

On Thu, Feb 27, 2014 at 3:56 PM, Bertrand Dechoux decho...@gmail.com
wrote:

 I understand the explanation but I had to try. However, the change could be
 made without breaking anything but that's another story.
 Regards
 Bertrand
 Bertrand Dechoux
 On Thu, Feb 27, 2014 at 2:05 PM, Nick Pentreath 
 nick.pentre...@gmail.comwrote:
 filter comes from the Scala collection method filter. I'd say it's best
 to keep in line with the Scala collections API, as Spark has done with RDDs
 generally (map, flatMap, take etc), so that is is easier and natural for
 developers to apply the same thinking for Scala (parallel) collections to
 Spark RDDs.

 Plus, such an API change would be a major breaking one and IMO not a good
 idea at this stage.

 deffilter(p: (A) = 
 Booleanhttp://www.scala-lang.org/api/2.10.3/scala/Boolean.html
 ): Seq http://www.scala-lang.org/api/2.10.3/scala/collection/Seq.html[A]

 Selects all elements of this sequence which satisfy a predicate.
 p

 the predicate used to test elements.
 returns

 a new sequence consisting of all elements of this sequence that satisfy
 the given predicate p. The order of the elements is preserved.


 On Thu, Feb 27, 2014 at 2:36 PM, Bertrand Dechoux decho...@gmail.comwrote:

 Hi,

 It might seem like a trivial issue but even though it is somehow a
 standard name filter() is not really explicit in which way it does work.
 Sure, it makes sense to provide a filter function but what happens when it
 returns true? Is the current element removed or kept? It is not really
 obvious.

 Has another name been already discussed? It could be keep() or remove().
 But take() could also be reused and instead of providing a number, the
 filter function could be requested.

  Regards

 Bertrand




Re: Running actions in loops

2014-03-07 Thread Nick Pentreath
There is #3 which is use mapPartitions and init one jodatime obj per partition, 
which is less overhead for large objects—
Sent from Mailbox for iPhone

On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 So the whole function closure you want to apply on your RDD needs to be
 serializable so that it can be serialized  sent to workers to operate on
 RDD. So objects of jodatime cannot be serialized  sent hence jodatime is
 out of work. 2 bad answers
 1. initialize jodatime for each row  complete work  destroy them, that
 way they are only intialized when job is running  need not be sent across.
 2. Write your own parser  hope jodatime guys get their act together.
 Regards
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi
 On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
 og...@nengoiksvelzud.comwrote:
  Mayur, have not thought of that. Yes, I use jodatime. What is the scope
 that this serialization issue applies to? Only the method making a call
 into / using such a library? The whole class the method using such a
 library belongs to? Sorry if it is a dumb question :)

 Ognen


 On 3/7/14, 1:29 PM, Mayur Rustagi wrote:

 Mostly the job you are executing is not serializable, this typically
 happens when you have a library that is not serializable.. are you using
 any library like jodatime etc ?

  Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski 
 og...@plainvanillagames.com wrote:

 It looks like the problem is in the filter task - is there anything
 special about filter()?

 I have removed the filter line from the loops just to see if things will
 work and they do.

 Anyone has any ideas?

 Thanks!
 Ognen


 On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

 Hello,

 What is the general approach people take when trying to do analysis
 across multiple large files where the data to be extracted from a
 successive file depends on the data extracted from a previous file or set
 of files?

 For example:
 I have the following: a group of HDFS files each 20+GB in size. I need
 to extract event1 on day 1 from first file and extract event2 from all
 remaining files in a period of successive dates, then do a calculation on
 the two events.
 I then need to move on to day2, extract event1 (with certain
 properties), take all following days, extract event2 and run a calculation
 against previous day for all days in period. So on and so on.

 I have verified that the following (very naive approach doesn't work):

 def
 calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
 = {
 val epd = new PipelineDate(end)
 val result = for {
   dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd)
   val f1 = sc.textFile(dt1.toJsonHdfsFileName)
   val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,)
 == event1).map(line =
 (line.split(,)(2).split(:)(1).replace(\,),0)).cache
   val c = e1.count.toDouble

   val intres = for {
 dt2 - PipelineDate.getPeriod(dt1+1,epd)
 val f2 = sc.textFile(dt2.toJsonHdfsFileName)
 val e2 =
 f2.filter(_.split(,)(0).split(:)(1).replace(\,) ==
 event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1))
 val e1e2 = e1.union(e2)
 val r = e1e2.groupByKey().filter(e = e._2.length  1 
 e._2.filter(_==0).length0).count.toDouble
   } yield (c/r) // get the retention rate
 } yield (dt1.toString-intres)
 Map(result:_*)
   }

 I am getting the following errors:
 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
 CountActor.scala:33
 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
 CountActor.scala:33) with 140 output partitions (allowLocal=false)
 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
 CountActor.scala:33)
 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
 map at CountActor.scala:32), which has no missing parents
 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
 CountActor.scala:33
 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
 serializable: java.io.NotSerializableException:
 com.github.ognenpv.pipeline.CountActor
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at 

Re: Running Spark on a single machine

2014-03-16 Thread Nick Pentreath
Please follow the instructions at 
http://spark.apache.org/docs/latest/index.html and 
http://spark.apache.org/docs/latest/quick-start.html to get started on a local 
machine.




—
Sent from Mailbox for iPhone

On Sun, Mar 16, 2014 at 11:39 PM, goi cto goi@gmail.com wrote:

 Hi,
 I know it is probably not the purpose of spark but the syntax is easy and
 cool...
 I need to run some spark like code in memory on a single machine any
 pointers how to optimize it to run only on one machine?
 -- 
 Eran | CTO

Re: Calling Spahk enthusiasts in Boston

2014-03-31 Thread Nick Pentreath
I would offer to host one in Cape Town but we're almost certainly the only 
Spark users in the country apart from perhaps one in Johanmesburg :)—
Sent from Mailbox for iPhone

On Mon, Mar 31, 2014 at 8:53 PM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:

 My fellow Bostonians and New Englanders,
 We cannot allow New York to beat us to having a banging Spark meetup.
 Respond to me (and I guess also Andy?) if you are interested.
 Yana,
 I'm not sure either what is involved in organizing, but we can figure it
 out. I didn't know about the meetup that never took off.
 Nick
 On Mon, Mar 31, 2014 at 2:31 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote:
 Nicholas, I'm in Boston and would be interested in a Spark group. Not
 sure if you know this -- there was a meetup that never got off the
 ground. Anyway, I'd be +1 for attending. Not sure what is involved in
 organizing. Seems a shame that a city like Boston doesn't have one.

 On Mon, Mar 31, 2014 at 2:02 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  As in, I am interested in helping organize a Spark meetup in the Boston
  area.
 
 
  On Mon, Mar 31, 2014 at 2:00 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
 
  Well, since this thread has played out as it has, lemme throw in a
  shout-out for Boston.

 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Calling-Spahk-enthusiasts-in-Boston-tp3544.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

NPE using saveAsTextFile

2014-04-08 Thread Nick Pentreath
Hi

I'm using Spark 0.9.0.

When calling saveAsTextFile on a custom hadoop inputformat (loaded with
newAPIHadoopRDD), I get the following error below.

If I call count, I get the correct count of number of records, so the
inputformat is being read correctly... the issue only appears when trying
to use saveAsTextFile.

If I call first() I get the correct output, also. So it doesn't appear to
be anything with the data or inputformat.

Any idea what the actual problem is, since this stack trace is not obvious
(though it seems to be in ResultTask which ultimately causes this).

Is this a known issue at all?


==

14/04/08 16:00:46 ERROR OneForOneStrategy:
java.lang.NullPointerException
at
com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
at
com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28)
at org.apache.spark.scheduler.ResultTask$.serializeInfo(ResultTask.scala:48)
at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:123)
at
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1443)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1414)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at 

Re: NPE using saveAsTextFile

2014-04-10 Thread Nick Pentreath
Ok I thought it may be closing over the config option. I am using config
for job configuration, but extracting vals from that. So not sure why as I
thought I'd avoided closing over it. Will go back to source and see where
it is creeping in.



On Thu, Apr 10, 2014 at 8:42 AM, Matei Zaharia matei.zaha...@gmail.comwrote:

 I haven't seen this but it may be a bug in Typesafe Config, since this is
 serializing a Config object. We don't actually use Typesafe Config
 ourselves.

 Do you have any nulls in the data itself by any chance? And do you know
 how that Config object is getting there?

 Matei

 On Apr 9, 2014, at 11:38 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 Anyone have a chance to look at this?

 Am I just doing something silly somewhere?

 If it makes any difference, I am using the elasticsearch-hadoop plugin for
 ESInputFormat. But as I say, I can parse the data (count, first() etc). I
 just can't save it as text file.




 On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath 
 nick.pentre...@gmail.comwrote:

 Hi

 I'm using Spark 0.9.0.

 When calling saveAsTextFile on a custom hadoop inputformat (loaded with
 newAPIHadoopRDD), I get the following error below.

 If I call count, I get the correct count of number of records, so the
 inputformat is being read correctly... the issue only appears when trying
 to use saveAsTextFile.

 If I call first() I get the correct output, also. So it doesn't appear to
 be anything with the data or inputformat.

 Any idea what the actual problem is, since this stack trace is not
 obvious (though it seems to be in ResultTask which ultimately causes this).

 Is this a known issue at all?


 ==

 14/04/08 16:00:46 ERROR OneForOneStrategy:
 java.lang.NullPointerException
  at
 com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
 at
 com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
  at
 com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:601)
 at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
  at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:601)
 at java.io.ObjectStreamClass.invokeWriteObject

Re: NPE using saveAsTextFile

2014-04-10 Thread Nick Pentreath
There was a closure over the config object lurking around - but in any case
upgrading to 1.2.0 for config did the trick as it seems to have been a bug
in Typesafe config,

Thanks Matei!


On Thu, Apr 10, 2014 at 8:46 AM, Nick Pentreath nick.pentre...@gmail.comwrote:

 Ok I thought it may be closing over the config option. I am using config
 for job configuration, but extracting vals from that. So not sure why as I
 thought I'd avoided closing over it. Will go back to source and see where
 it is creeping in.



 On Thu, Apr 10, 2014 at 8:42 AM, Matei Zaharia matei.zaha...@gmail.comwrote:

 I haven't seen this but it may be a bug in Typesafe Config, since this is
 serializing a Config object. We don't actually use Typesafe Config
 ourselves.

 Do you have any nulls in the data itself by any chance? And do you know
 how that Config object is getting there?

 Matei

 On Apr 9, 2014, at 11:38 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 Anyone have a chance to look at this?

 Am I just doing something silly somewhere?

 If it makes any difference, I am using the elasticsearch-hadoop plugin
 for ESInputFormat. But as I say, I can parse the data (count, first() etc).
 I just can't save it as text file.




 On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath 
 nick.pentre...@gmail.comwrote:

 Hi

 I'm using Spark 0.9.0.

 When calling saveAsTextFile on a custom hadoop inputformat (loaded with
 newAPIHadoopRDD), I get the following error below.

 If I call count, I get the correct count of number of records, so the
 inputformat is being read correctly... the issue only appears when trying
 to use saveAsTextFile.

 If I call first() I get the correct output, also. So it doesn't appear
 to be anything with the data or inputformat.

 Any idea what the actual problem is, since this stack trace is not
 obvious (though it seems to be in ResultTask which ultimately causes this).

 Is this a known issue at all?


 ==

 14/04/08 16:00:46 ERROR OneForOneStrategy:
 java.lang.NullPointerException
  at
 com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
 at
 com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
  at
 com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:601)
 at
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
  at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
  at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
  at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native

Re: StackOverflow Error when run ALS with 100 iterations

2014-04-16 Thread Nick Pentreath
I'd also say that running for 100 iterations is a waste of resources, as
ALS will typically converge pretty quickly, as in within 10-20 iterations.


On Wed, Apr 16, 2014 at 3:54 AM, Xiaoli Li lixiaolima...@gmail.com wrote:

 Thanks a lot for your information. It really helps me.


 On Tue, Apr 15, 2014 at 7:57 PM, Cheng Lian lian.cs@gmail.com wrote:

 Probably this JIRA 
 issuehttps://spark-project.atlassian.net/browse/SPARK-1006solves your 
 problem. When running with large iteration number, the lineage
 DAG of ALS becomes very deep, both DAGScheduler and Java serializer may
 overflow because they are implemented in a recursive way. You may resort to
 checkpointing as a workaround.


 On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li lixiaolima...@gmail.comwrote:

 Hi,

 I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory.
 ALS program cannot run  even with a very small size of training data (about
 91 lines) due to StackVverFlow error when I set the number of iterations to
 100. I think the problem may be caused by updateFeatures method which
 updates products RDD iteratively by join previous products RDD.


 I am writing a program which has a similar update process with ALS.
 This problem also appeared when I iterate too many times (more than 80).

 The iterative part of my code is as following:

 solution = outlinks.join(solution). map {
  ...
  }


 Has anyone had similar problem?  Thanks.


 Xiaoli






Re: User/Product Clustering with pySpark ALS

2014-04-29 Thread Nick Pentreath
There's no easy way to d this currently. The pieces are there from the PySpark 
code for regression which should be adaptable.


But you'd have to roll your own solution.




This is something I also want so I intend to put together a pull request for 
this soon
—
Sent from Mailbox

On Tue, Apr 29, 2014 at 4:28 PM, Laird, Benjamin
benjamin.la...@capitalone.com wrote:

 Hi all -
 I’m using pySpark/MLLib ALS for user/item clustering and would like to 
 directly access the user/product RDDs (called userFeatures/productFeatures in 
 class MatrixFactorizationModel in 
 mllib/recommendation/MatrixFactorizationModel.scala
 This doesn’t seem to complex, but it doesn’t seem like the functionality is 
 currently available. I think it requires accessing the underlying java mode 
 like so:
 model = ALS.train(ratings,1,iterations=1,blocks=5)
 userFeatures = RDD(model.javamodel.userFeatures, sc, ???)
 However, I don’t know what to pass as the deserializer. I need these low 
 dimensional vectors as an RDD to then use in Kmeans clustering. Has anyone 
 done something similar?
 Ben
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates. The information transmitted herewith is 
 intended only for use by the individual or entity to which it is addressed.  
 If the reader of this message is not the intended recipient, you are hereby 
 notified that any review, retransmission, dissemination, distribution, 
 copying or other use of, or taking of any action in reliance upon this 
 information is strictly prohibited. If you have received this communication 
 in error, please contact the sender and delete the material from your 
 computer.

spark-submit / S3

2014-05-16 Thread Nick Pentreath
Hi

I see from the docs for 1.0.0 that the new spark-submit mechanism seems
to support specifying the jar with hdfs:// or http://

Does this support S3? (It doesn't seem to as I have tried it on EC2 but
doesn't seem to work):

./bin/spark-submit --master local[2] --class myclass s3n://bucket/myapp.jar
args


Re: Spark on HBase vs. Spark on HDFS

2014-05-22 Thread Nick Pentreath
Hi

In my opinion, running HBase for immutable data is generally overkill in
particular if you are using Shark anyway to cache and analyse the data and
provide the speed.

HBase is designed for random-access data patterns and high throughput R/W
activities. If you are only ever writing immutable logs, then that is what
HDFS is designed for.

Having said that, if you replace HBase you will need to come up with a
reliable way to put data into HDFS (a log aggregator like Flume or message
bus like Kafka perhaps, etc), so the pain of doing that may not be worth it
given you already know HBase.


On Thu, May 22, 2014 at 9:33 AM, Limbeck, Philip philip.limb...@automic.com
 wrote:

  HI!



 We are currently using HBase as our primary data store of different
 event-like data. On-top of that, we use Shark to aggregate this data and
 keep it
 in memory for fast data access.  Since we use no specific HBase
 functionality whatsoever except Putting data into it, a discussion
 came up on having to set up an additional set of components on top of HDFS
 instead of just writing to HDFS directly.

  Is there any overview regarding implications of doing that ? I mean
 except things like taking care of file structure and the like. What is the
 true

 advantage of Spark on HBase in favor of Spark on HDFS?



 Best

 Philip

 Automic Software GmbH, Hauptstrasse 3C, 3012 Wolfsgraben
 Firmenbuchnummer/Commercial Register No. 275184h
 Firmenbuchgericht/Commercial Register Court: Landesgericht St. Poelten

 This email (including any attachments) may contain information which is
 privileged, confidential, or protected. If you are not the intended
 recipient, note that any disclosure, copying, distribution, or use of the
 contents of this message and attached files is prohibited. If you have
 received this email in error, please notify the sender and delete this
 email and any attached files.




Re: Writing RDDs from Python Spark progrma (pyspark) to HBase

2014-05-28 Thread Nick Pentreath
It's not possible currently to write anything other than text (or pickle
files I think in 1.0.0 or if not then in 1.0.1) from PySpark.

I have an outstanding pull request to add READING any InputFormat from
PySpark, and after that is in I will look into OutputFormat too.

What does your data look like? Any details about your use case that you
could share would aid the design of this feature.

N


On Wed, May 28, 2014 at 3:00 PM, gaurav.dasgupta gaurav.d...@gmail.comwrote:

 Hi,

 I am unable to understand how to write data directly on HBase table from a
 Spark (pyspark) Python program. Is this possible in the current Spark
 releases? If so, can someone provide an example code snippet to do this?

 Thanks in advance.

 Regards,
 Gaurav



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Writing-RDDs-from-Python-Spark-progrma-pyspark-to-HBase-tp6469.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Python, Spark and HBase

2014-05-29 Thread Nick Pentreath
Hi Tommer,

I'm working on updating and improving the PR, and will work on getting an
HBase example working with it. Will feed back as soon as I have had the
chance to work on this a bit more.

N


On Thu, May 29, 2014 at 3:27 AM, twizansk twiza...@gmail.com wrote:

 The code which causes the error is:

 The code which causes the error is:

 sc = SparkContext(local, My App)
 rdd = sc.newAPIHadoopFile(
 name,
 'org.apache.hadoop.hbase.mapreduce.TableInputFormat',
 'org.apache.hadoop.hbase.io.ImmutableBytesWritable',
 'org.apache.hadoop.hbase.client.Result',
 conf={hbase.zookeeper.quorum: my-host,
   hbase.rootdir: hdfs://my-host:8020/hbase,
   hbase.mapreduce.inputtable: data})

 The full stack trace is:



 Py4JError Traceback (most recent call last)
 ipython-input-8-3b9a4ea2f659 in module()
   7 conf={hbase.zookeeper.quorum: my-host,
   8   hbase.rootdir: hdfs://my-host:8020/hbase,
  9   hbase.mapreduce.inputtable: data})
  10
  11

 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.pyc in
 newAPIHadoopFile(self, name, inputformat_class, key_class, value_class,
 key_wrapper, value_wrapper, conf)
 281 for k, v in conf.iteritems():
 282 jconf[k] = v
 -- 283 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc,
 name,
 inputformat_class, key_class, value_class,
 284 key_wrapper,
 value_wrapper, jconf)
 285 return RDD(jrdd, self, PickleSerializer())


 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
 in __getattr__(self, name)
 657 else:
 658 raise Py4JError('{0} does not exist in the JVM'.
 -- 659 format(self._fqn + name))
 660
 661 def __call__(self, *args):

 Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not
 exist in the JVM



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6507.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Can't seem to link external/twitter classes from my own app

2014-06-04 Thread Nick Pentreath
@Sean, the %% syntax in SBT should automatically add the Scala major
version qualifier (_2.10, _2.11 etc) for you, so that does appear to be
correct syntax for the build.

I seemed to run into this issue with some missing Jackson deps, and solved
it by including the jar explicitly on the driver class path:

bin/spark-submit *-*
*-driver-class-path
SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar* --class
SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar

Seems redundant to me since I thought that the JAR as argument is copied to
driver and made available. But this solved it for me so perhaps give it a
try?



On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:

 Those aren't the names of the artifacts:


 http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22

 The name is spark-streaming-twitter_2.10

 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee
 unorthodox.engine...@gmail.com wrote:
  Man, this has been hard going. Six days, and I finally got a Hello
 World
  App working that I wrote myself.
 
  Now I'm trying to make a minimal streaming app based on the twitter
  examples, (running standalone right now while learning) and when running
 it
  like this:
 
  bin/spark-submit --class SimpleApp
  SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar
 
  I'm getting this error:
 
  Exception in thread main java.lang.NoClassDefFoundError:
  org/apache/spark/streaming/twitter/TwitterUtils$
 
  Which I'm guessing is because I haven't put in a dependency to
  external/twitter in the .sbt, but _how_? I can't find any docs on it.
  Here's my build file so far:
 
  simple.sbt
  --
  name := Simple Project
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core % 1.0.0
 
  libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0
 
  libraryDependencies += org.apache.spark %% spark-streaming-twitter %
  1.0.0
 
  libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3
 
  resolvers += Akka Repository at http://repo.akka.io/releases/;
  --
 
  I've tried a few obvious things like adding:
 
  libraryDependencies += org.apache.spark %% spark-external % 1.0.0
 
  libraryDependencies += org.apache.spark %% spark-external-twitter %
  1.0.0
 
  because, well, that would match the naming scheme implied so far, but it
  errors.
 
 
  Also, I just realized I don't completely understand if:
  (a) the spark-submit command _sends_ the .jar to all the workers, or
  (b) the spark-submit commands sends a _job_ to the workers, which are
  supposed to already have the jar file installed (or in hdfs), or
  (c) the Context is supposed to list the jars to be distributed. (is that
  deprecated?)
 
  One part of the documentation says:
 
   Once you have an assembled jar you can call the bin/spark-submit
 script as
  shown here while passing your jar.
 
  but another says:
 
  application-jar: Path to a bundled jar including your application and
 all
  dependencies. The URL must be globally visible inside of your cluster,
 for
  instance, an hdfs:// path or a file:// path that is present on all
 nodes.
 
  I suppose both could be correct if you take a certain point of view.
 
  --
  Jeremy Lee  BCompSci(Hons)
The Unorthodox Engineers



Re: Can't seem to link external/twitter classes from my own app

2014-06-04 Thread Nick Pentreath
The magic incantation is sbt assembly (not assemble).


Actually I find maven with their assembly plugins to be very easy (mvn 
package). I can send a Pom.xml for a skeleton project if you need
—
Sent from Mailbox

On Thu, Jun 5, 2014 at 6:59 AM, Jeremy Lee unorthodox.engine...@gmail.com
wrote:

 Hmm.. That's not working so well for me. First, I needed to add a
 project/plugin.sbt file with the contents:
 addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.4)
 Before 'sbt/sbt assemble' worked at all. And I'm not sure about that
 version number, but 0.9.1 isn't working much better and 11.4 is the
 latest one recommended by the sbt project site. Where did you get your
 version from?
 Second, even when I do get it to build a .jar, spark-submit is still
 telling me the external.twitter library is missing.
 I tried using your github project as-is, but it also complained about the
 missing plugin.. I'm trying it with various versions now to see if I can
 get that working, even though I don't know anything about kafka. Hmm, and
 no. Here's what I get:
 [info] Set current project to Simple Project (in build
 file:/home/ubuntu/spark-1.0.0/SparkKafka/)
 [error] Not a valid command: assemble
 [error] Not a valid project ID: assemble
 [error] Expected ':' (if selecting a configuration)
 [error] Not a valid key: assemble (similar: assembly, assemblyJarName,
 assemblyDirectory)
 [error] assemble
 [error]
 I also found this project which seemed to be exactly what I was after:
 https://github.com/prabeesh/SparkTwitterAnalysis
 ...but it was for Spark 0.9, and though I updated all the version
 references to 1.0.0, that one doesn't work either. I can't even get it to
 build.
 *sigh*
 Is it going to be easier to just copy the external/ source code into my own
 project? Because I will... especially if creating Uberjars takes this
 long every... single... time...
 On Thu, Jun 5, 2014 at 8:52 AM, Jeremy Lee unorthodox.engine...@gmail.com
 wrote:
 Thanks Patrick!

 Uberjars. Cool. I'd actually heard of them. And thanks for the link to the
 example! I shall work through that today.

 I'm still learning sbt and it's many options... the last new framework I
 learned was node.js, and I think I've been rather spoiled by npm.

 At least it's not maven. Please, oh please don't make me learn maven too.
 (The only people who seem to like it have Software Stockholm Syndrome: I
 know maven kidnapped me and beat me up, but if you spend long enough with
 it, you eventually start to sympathize and see it's point of view.)


 On Thu, Jun 5, 2014 at 3:39 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 Hey Jeremy,

 The issue is that you are using one of the external libraries and
 these aren't actually packaged with Spark on the cluster, so you need
 to create an uber jar that includes them.

 You can look at the example here (I recently did this for a kafka
 project and the idea is the same):

 https://github.com/pwendell/kafka-spark-example

 You'll want to make an uber jar that includes these packages (run sbt
 assembly) and then submit that jar to spark-submit. Also, I'd try
 running it locally first (if you aren't already) just to make the
 debugging simpler.

 - Patrick


 On Wed, Jun 4, 2014 at 6:16 AM, Sean Owen so...@cloudera.com wrote:
  Ah sorry, this may be the thing I learned for the day. The issue is
  that classes from that particular artifact are missing though. Worth
  interrogating the resulting .jar file with jar tf to see if it made
  it in?
 
  On Wed, Jun 4, 2014 at 2:12 PM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:
  @Sean, the %% syntax in SBT should automatically add the Scala major
 version
  qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct
  syntax for the build.
 
  I seemed to run into this issue with some missing Jackson deps, and
 solved
  it by including the jar explicitly on the driver class path:
 
  bin/spark-submit --driver-class-path
  SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar --class
 SimpleApp
  SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar
 
  Seems redundant to me since I thought that the JAR as argument is
 copied to
  driver and made available. But this solved it for me so perhaps give
 it a
  try?
 
 
 
  On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:
 
  Those aren't the names of the artifacts:
 
 
 
 http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22
 
  The name is spark-streaming-twitter_2.10
 
  On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee
  unorthodox.engine...@gmail.com wrote:
   Man, this has been hard going. Six days, and I finally got a Hello
   World
   App working that I wrote myself.
  
   Now I'm trying to make a minimal streaming app based on the twitter
   examples, (running standalone right now while learning) and when
 running
   it
   like this:
  
   bin/spark-submit --class SimpleApp
   SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar
  
   I'm getting this error

Re: Cassandra examples don't work for me

2014-06-05 Thread Nick Pentreath
Yyou need cassandra 1.2.6 for Spark examples —
Sent from Mailbox

On Thu, Jun 5, 2014 at 12:02 AM, Tim Kellogg t...@2lemetry.com wrote:

 Hi,
 I’m following the directions to run the cassandra example 
 “org.apache.spark.examples.CassandraTest” and I get this error
 Exception in thread main java.lang.IncompatibleClassChangeError: Found 
 interface org.apache.hadoop.mapreduce.JobContext, but class was expected
 at 
 org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:113)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at 
 org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
 at 
 org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:370)
 at org.apache.spark.examples.CassandraTest$.main(CassandraTest.scala:100)
 at org.apache.spark.examples.CassandraTest.main(CassandraTest.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 I’m running Cassandra version 2.0.6, and this comes from the 
 spark-1.0.0-bin-hadoop2 distribution package. I am running the example with 
 this commandline:
 bin/run-example org.apache.spark.examples.CassandraTest localhost localhost 
 9160
 I suspect it’s because I’m running the wrong version of Cassandra, but I 
 can’t find the correct version listed anywhere. I hope this is an easy issue 
 to address.
 Much thanks, Tim

Re: compress in-memory cache?

2014-06-05 Thread Nick Pentreath
Have you set the persistence level of the RDD to MEMORY_ONLY_SER (
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)?
If you're calling cache, the default persistence level is MEMORY_ONLY so
that setting will have no impact.


On Thu, Jun 5, 2014 at 4:41 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 I have a working set larger than available memory, thus I am hoping to
 turn on rdd compression so that I can store more in-memory. Strangely it
 made no difference. The number of cached partitions, fraction cached, and
 size in memory remain the same. Any ideas?

 I confirmed that rdd compression wasn't on before and it was on for the
 second test.

 scala sc.getConf.getAll foreach println
 ...
 (spark.rdd.compress,true)
 ...

 I haven't tried lzo vs snappy, but my guess is that either one should
 provide at least some benefit..

 Thanks.
 -Simon




Re: error loading large files in PySpark 0.9.0

2014-06-07 Thread Nick Pentreath
Ah looking at that inputformat it should just work out the box using 
sc.newAPIHadoopFile ...


Would be interested to hear if it works as expected for you (in python you'll 
end up with bytearray values).




N
—
Sent from Mailbox

On Fri, Jun 6, 2014 at 9:38 PM, Jeremy Freeman freeman.jer...@gmail.com
wrote:

 Oh cool, thanks for the heads up! Especially for the Hadoop InputFormat
 support. We recently wrote a custom hadoop input format so we can support
 flat binary files
 (https://github.com/freeman-lab/thunder/tree/master/scala/src/main/scala/thunder/util/io/hadoop),
 and have been testing it in Scala. So I was following Nick's progress and
 was eager to check this out when ready. Will let you guys know how it goes.
 -- J
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p7144.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Are scala.MatchError messages a problem?

2014-06-08 Thread Nick Pentreath
When you use match, the match must be exhaustive. That is, a match error is 
thrown if the match fails. 


That's why you usually handle the default case using case _ = ...




Here it looks like your taking the text of all statuses - which means not all 
of them will be commands... Which means your match will not be exhaustive.




The solution is either to add a default case which does nothing, or probably 
better to add a .filter such that you filter out anything that's not a command 
before matching.




Just looking at it again it could also be that you take x = x._2._1 ... What 
type is that? Should it not be a Seq if you're joining, in which case the match 
will also fail...




Hope this helps.
—
Sent from Mailbox

On Sun, Jun 8, 2014 at 6:45 PM, Jeremy Lee unorthodox.engine...@gmail.com
wrote:

 I shut down my first (working) cluster and brought up a fresh one... and
 It's been a bit of a horror and I need to sleep now. Should I be worried
 about these errors? Or did I just have the old log4j.config tuned so I
 didn't see them?
 I
 14/06/08 16:32:52 ERROR scheduler.JobScheduler: Error running job streaming
 job 1402245172000 ms.2
 scala.MatchError: 0101-01-10 (of class java.lang.String)
 at SimpleApp$$anonfun$6$$anonfun$apply$6.apply(SimpleApp.scala:218)
 at SimpleApp$$anonfun$6$$anonfun$apply$6.apply(SimpleApp.scala:217)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at SimpleApp$$anonfun$6.apply(SimpleApp.scala:217)
 at SimpleApp$$anonfun$6.apply(SimpleApp.scala:214)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 The error comes from this code, which seemed like a sensible way to match
 things:
 (The case cmd_plus(w) statement is generating the error,)
 val cmd_plus = [+]([\w]+).r
 val cmd_minus = [-]([\w]+).r
 // find command user tweets
 val commands = stream.map(
 status = ( status.getUser().getId(), status.getText() )
 ).foreachRDD(rdd = {
 rdd.join(superusers).map(
 x = x._2._1
 ).collect().foreach{ cmd = {
 218:  cmd match {
 case cmd_plus(w) = {
 ...
 } case cmd_minus(w) = { ... } } }} })
 It seems a bit excessive for scala to throw exceptions because a regex
 didn't match. Something feels wrong.

Re: mllib, python and SVD

2014-06-09 Thread Nick Pentreath
Don't think SVD is exposed via MLlib in Python yet,

but you can also check out: https://github.com/ogrisel/spylearn where
Jeremy Freeman put together a numpy-based SVD algorithm (this is a bit
outdated but should still work I assume) (also
https://github.com/freeman-lab/thunder has a PCA implementation).


On Mon, Jun 9, 2014 at 11:32 AM, Håvard Wahl Kongsgård 
haavard.kongsga...@gmail.com wrote:

 Hi, is it possible to do Singular value decomposition (SVD) with python in
 spark(1.0.0)?


 -Havard WK



Re: Optimizing reduce for 'huge' aggregated outputs.

2014-06-10 Thread Nick Pentreath
Can you key your RDD by some key and use reduceByKey? In fact if you are 
merging bunch of maps you can create a set of (k, v) in your mapPartitions and 
then reduceByKey using some merge function. The reduce will happen in parallel 
on multiple nodes in this case. You'll end up with just a single set of k, v 
per partition which you can reduce or collect and merge on the driver.




—
Sent from Mailbox

On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I suppose what I want is the memory efficiency of toLocalIterator and the
 speed of collect. Is there any such thing?
 On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu
 wrote:
 Hello,

 I noticed that the final reduce function happens in the driver node with a
 code that looks like the following.

 val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) {
  a.merge(b)
 }

 although individual outputs from mappers are small. Over time the
 aggregated result outputMap could be huuuge (say with hundreds of millions
 of keys and values, reaching giga bytes).

 I noticed that, even if we have a lot of memory in the driver node, this
 process becomes realy slow eventually (say we have 100+ partitions. the
 first reduce is fast, but progressively, it becomes veeery slow as more and
 more partition outputs get aggregated). Is this because the intermediate
 reduce output gets serialized and then deserialized every time?

 What I'd like ideally is, since reduce is taking place in the same machine
 any way, there's no need for any serialization and deserialization, and
 just aggregate the incoming results into the final aggregation. Is this
 possible?


RE: Question about RDD cache, unpersist, materialization

2014-06-11 Thread Nick Pentreath
If you want to force materialization use .count()


Also if you can simply don't unpersist anything, unless you really need to free 
the memory 
—
Sent from Mailbox

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim
taeyun@innowireless.co.kr wrote:

 BTW, it is possible that rdd.first() does not compute the whole partitions.
 So, first() cannot be uses for the situation below.
 -Original Message-
 From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
 Sent: Wednesday, June 11, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Question about RDD cache, unpersist, materialization
 Hi,
 What I (seems to) know about RDD persisting API is as follows:
 - cache() and persist() is not an action. It only does a marking.
 - unpersist() is also not an action. It only removes a marking. But if the
 rdd is already in memory, it is unloaded.
 And there seems no API to forcefully materialize the RDD without requiring a
 data by an action method, for example first().
 So, I am faced with the following scenario.
 {
 JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
 empty for merging
 for (int i = 0; i  10; i++)
 {
 JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
 rdd.cache();  // Since it will be used twice, cache.
 rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
 Transform and save, rdd materializes
 rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
 transform to T and merge by union
 rdd.unpersist();  // Now it seems not needed. (But needed actually)
 }
 // Here, rddUnion actually materializes, and needs all 10 rdds that
 already unpersisted.
 // So, rebuilding all 10 rdds will occur.
 rddUnion.saveAsTextFile(mergedFileName);
 }
 If rddUnion can be materialized before the rdd.unpersist() line and
 cache()d, the rdds in the loop will not be needed on
 rddUnion.saveAsTextFile().
 Now what is the best strategy?
 - Do not unpersist all 10 rdds in the loop.
 - Materialize rddUnion in the loop by calling 'light' action API, like
 first().
 - Give up and just rebuild/reload all 10 rdds when saving rddUnion.
 Is there some misunderstanding?
 Thanks.

Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

2014-06-25 Thread Nick Pentreath
can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat.
An example of using it with Hadoop is here:
http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html

Using it with Spark will be similar to the examples:
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
and
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala


On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi,

 (My excuses for the cross-post from SO)

 I'm trying to create Cassandra SSTables from the results of a batch
 computation in Spark. Ideally, each partition should create the SSTable for
 the data it holds in order to parallelize the process as much as possible
 (and probably even stream it to the Cassandra ring as well)

 After the initial hurdles with the CQLSSTableWriter (like requiring the
 yaml file), I'm confronted now with this issue:

 java.lang.RuntimeException: Attempting to load already loaded column family 
 customer.rawts
 at org.apache.cassandra.config.Schema.load(Schema.java:347)
 at org.apache.cassandra.config.Schema.load(Schema.java:112)
 at 
 org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

 I'm creating a writer on each parallel partition like this:

 def store(rdd:RDD[Message]) = {
 rdd.foreachPartition( msgIterator = {
   val writer = CQLSSTableWriter.builder()
 .inDirectory(/tmp/cass)
 .forTable(schema)
 .using(insertSttmt).build()
   msgIterator.foreach(msg = {...})
 })}

 And if I'm reading the exception correctly, I can only create one writer
 per table in one JVM. Digging a bit further in the code, it looks like the
 Schema.load(...) singleton enforces that limitation.

 I guess writings to the writer will not be thread-safe and even if they
 were the contention that multiple threads will create by having all
 parallel tasks trying to dump few GB of data to disk at the same time will
 defeat the purpose of using the SSTables for bulk upload anyway.

 So, are there ways to use the CQLSSTableWriter concurrently?

 If not, what is the next best option to load batch data at high throughput
 in Cassandra?

 Will the upcoming Spark-Cassandra integration help with this? (ie. should
 I just sit back, relax and the problem will solve itself?)

 Thanks,

 Gerard.



Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

2014-06-25 Thread Nick Pentreath
Right, ok.

I can't say I've used the Cassandra OutputFormats before. But perhaps if
you use it directly (instead of via Calliope) you may be able to get it to
work, albeit with less concise code?

Or perhaps you may be able to build Cassandra from source with Hadoop 2 /
CDH4 support:
https://groups.google.com/forum/#!topic/nosql-databases/Y-9amAdZk1s




On Wed, Jun 25, 2014 at 9:14 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Thanks Nick.

 We used the CassandraOutputFormat through Calliope. The Calliope API makes
 the CassandraOutputFormat quite accessible  and is cool to work with.  It
 worked fine at prototype level, but we had Hadoop version conflicts when we
 put it in our Spark environment (Using our Spark assembly compiled with
 CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is
 compiled against a different hadoop version  (v1).

 We could not get round that issue. (Any pointers in that direction?)

 That's why I'm trying the direct CQLSSTableWriter way but it looks blocked
 as well.

  -kr, Gerard.




 On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 can you not use a Cassandra OutputFormat? Seems they have
 BulkOutputFormat. An example of using it with Hadoop is here:
 http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html

 Using it with Spark will be similar to the examples:
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
 and
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala


 On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 (My excuses for the cross-post from SO)

 I'm trying to create Cassandra SSTables from the results of a batch
 computation in Spark. Ideally, each partition should create the SSTable for
 the data it holds in order to parallelize the process as much as possible
 (and probably even stream it to the Cassandra ring as well)

 After the initial hurdles with the CQLSSTableWriter (like requiring the
 yaml file), I'm confronted now with this issue:


 java.lang.RuntimeException: Attempting to load already loaded column family 
 customer.rawts
 at org.apache.cassandra.config.Schema.load(Schema.java:347)
 at org.apache.cassandra.config.Schema.load(Schema.java:112)
 at 
 org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

 I'm creating a writer on each parallel partition like this:


 def store(rdd:RDD[Message]) = {
 rdd.foreachPartition( msgIterator = {
   val writer = CQLSSTableWriter.builder()
 .inDirectory(/tmp/cass)
 .forTable(schema)
 .using(insertSttmt).build()
   msgIterator.foreach(msg = {...})
 })}

 And if I'm reading the exception correctly, I can only create one writer
 per table in one JVM. Digging a bit further in the code, it looks like the
 Schema.load(...) singleton enforces that limitation.

 I guess writings to the writer will not be thread-safe and even if they
 were the contention that multiple threads will create by having all
 parallel tasks trying to dump few GB of data to disk at the same time will
 defeat the purpose of using the SSTables for bulk upload anyway.

 So, are there ways to use the CQLSSTableWriter concurrently?

 If not, what is the next best option to load batch data at high
 throughput in Cassandra?

 Will the upcoming Spark-Cassandra integration help with this? (ie.
 should I just sit back, relax and the problem will solve itself?)

 Thanks,

 Gerard.






Re: ElasticSearch enrich

2014-06-26 Thread Nick Pentreath
You can just add elasticsearch-hadoop as a dependency to your project to
user the ESInputFormat and ESOutputFormat (
https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
here:
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just
./bin/elasticsearch) and use the default config (host = localhost, port =
9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode and
 just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create
 a query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
  wrote:

 Its not used as default serializer for some issues with compatibility
  requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as
 the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271





Re: Sample datasets for MLlib and Graphx

2014-07-03 Thread Nick Pentreath
Take a look at Kaggle competition datasets - https://www.kaggle.com/competitions




For svm there are a couple of ad click prediction datasets of pretty large size.




For graph stuff the SNAP has large network data: https://snap.stanford.edu/data/



—
Sent from Mailbox

On Thu, Jul 3, 2014 at 3:25 PM, AlexanderRiggers
alexander.rigg...@gmail.com wrote:

 Hello!
 I want to play around with several different cluster settings and measure
 performances for MLlib and GraphX  and was wondering if anybody here could
 hit me up with datasets for these applications from 5GB onwards? 
 I mostly interested in SVM and Triangle Count, but would be glad for any
 help.
 Best regards,
 Alex
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Sample-datasets-for-MLlib-and-Graphx-tp8760.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Sample datasets for MLlib and Graphx

2014-07-03 Thread Nick Pentreath
The Kaggle data is not in libsvm format so you'd have to do some transformation.


The Criteo and KDD cup datasets are if I recall fairly large. Criteo ad 
prediction data is around 2-3GB compressed I think.




To my knowledge these are the largest binary classification datasets I've come 
across which are easily publicly available (very happy to be proved wrong about 
this though :)
—
Sent from Mailbox

On Thu, Jul 3, 2014 at 4:39 PM, AlexanderRiggers
alexander.rigg...@gmail.com wrote:

 Nick Pentreath wrote
 Take a look at Kaggle competition datasets
 - https://www.kaggle.com/competitions
 I was looking for files in LIBSVM format and never found something on Kaggle
 in bigger size. Most competitions I ve seen need data processing and feature
 generating, but maybe I ve to take a second look.
 Nick Pentreath wrote
 For graph stuff the SNAP has large network
 data: https://snap.stanford.edu/data/
 Thanks
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Sample-datasets-for-MLlib-and-Graphx-tp8760p8762.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: DynamoDB input source

2014-07-04 Thread Nick Pentreath
You should be able to use DynamoDBInputFormat (I think this should be part
of AWS libraries for Java) and create a HadoopRDD from that.


On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote:

 Hi,

 I noticed mention of DynamoDB as input source in

 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
 .

 Unfortunately, Google is not coming to my rescue on finding
 further mention for this support.

 Any pointers would be well received.

 Big thanks,
 ian



Re: DynamoDB input source

2014-07-04 Thread Nick Pentreath
No boto support for that. 

In master there is Python support for loading Hadoop inputFormat. Not sure if 
it will be in 1.0.1 or 1.1

I master docs under the programming guide are instructions and also under 
examples project there are pyspark examples of using Cassandra and HBase. These 
should hopefully give you enough to get started. 

Depending on how easy it is to use the dynamo DB format, you may have to write 
a custom converter (see the mentioned examples for storm details).

Sent from my iPhone

 On 4 Jul 2014, at 08:38, Ian Wilkinson ia...@me.com wrote:
 
 Hi Nick,
 
 I’m going to be working with python primarily. Are you aware of
 comparable boto support?
 
 ian
 
 On 4 Jul 2014, at 16:32, Nick Pentreath nick.pentre...@gmail.com wrote:
 
 You should be able to use DynamoDBInputFormat (I think this should be part 
 of AWS libraries for Java) and create a HadoopRDD from that.
 
 
 On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote:
 Hi,
 
 I noticed mention of DynamoDB as input source in
 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf.
 
 Unfortunately, Google is not coming to my rescue on finding
 further mention for this support.
 
 Any pointers would be well received.
 
 Big thanks,
 ian
 


Re: DynamoDB input source

2014-07-04 Thread Nick Pentreath
I should qualify by saying there is boto support for dynamodb - but not for the 
inputFormat. You could roll your own python-based connection but this involves 
figuring out how to split the data in dynamo - inputFormat takes care of this 
so should be the easier approach —
Sent from Mailbox

On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson ia...@me.com wrote:

 Excellent. Let me get browsing on this.
 Huge thanks,
 ian
 On 4 Jul 2014, at 16:47, Nick Pentreath nick.pentre...@gmail.com wrote:
 No boto support for that. 
 
 In master there is Python support for loading Hadoop inputFormat. Not sure 
 if it will be in 1.0.1 or 1.1
 
 I master docs under the programming guide are instructions and also under 
 examples project there are pyspark examples of using Cassandra and HBase. 
 These should hopefully give you enough to get started. 
 
 Depending on how easy it is to use the dynamo DB format, you may have to 
 write a custom converter (see the mentioned examples for storm details).
 
 Sent from my iPhone
 
 On 4 Jul 2014, at 08:38, Ian Wilkinson ia...@me.com wrote:
 
 Hi Nick,
 
 I’m going to be working with python primarily. Are you aware of
 comparable boto support?
 
 ian
 
 On 4 Jul 2014, at 16:32, Nick Pentreath nick.pentre...@gmail.com wrote:
 
 You should be able to use DynamoDBInputFormat (I think this should be part 
 of AWS libraries for Java) and create a HadoopRDD from that.
 
 
 On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote:
 Hi,
 
 I noticed mention of DynamoDB as input source in
 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf.
 
 Unfortunately, Google is not coming to my rescue on finding
 further mention for this support.
 
 Any pointers would be well received.
 
 Big thanks,
 ian
 
 

Re: DynamoDB input source

2014-07-04 Thread Nick Pentreath
Interesting - I would have thought they would make that available publicly.
Unfortunately, unless you can use Spark on EMR, I guess your options are to
hack it by spinning up an EMR cluster and getting the JAR, or maybe fall
back to using boto and rolling your own :(



On Fri, Jul 4, 2014 at 9:28 AM, Ian Wilkinson ia...@me.com wrote:

 Trying to discover source for the DynamoDBInputFormat.
 Not appearing in:

 - https://github.com/aws/aws-sdk-java
 - https://github.com/apache/hive

 Then came across
 http://stackoverflow.com/questions/1704/jar-containing-org-apache-hadoop-hive-dynamodb
 .
 Unsure whether this represents the latest situation…

 ian


 On 4 Jul 2014, at 16:58, Nick Pentreath nick.pentre...@gmail.com wrote:

 I should qualify by saying there is boto support for dynamodb - but not
 for the inputFormat. You could roll your own python-based connection but
 this involves figuring out how to split the data in dynamo - inputFormat
 takes care of this so should be the easier approach
 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson ia...@me.com wrote:

 Excellent. Let me get browsing on this.

 Huge thanks,
 ian


  On 4 Jul 2014, at 16:47, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 No boto support for that.

 In master there is Python support for loading Hadoop inputFormat. Not
 sure if it will be in 1.0.1 or 1.1

 I master docs under the programming guide are instructions and also under
 examples project there are pyspark examples of using Cassandra and HBase.
 These should hopefully give you enough to get started.

 Depending on how easy it is to use the dynamo DB format, you may have to
 write a custom converter (see the mentioned examples for storm details).

 Sent from my iPhone

 On 4 Jul 2014, at 08:38, Ian Wilkinson ia...@me.com wrote:

 Hi Nick,

 I’m going to be working with python primarily. Are you aware of
 comparable boto support?

 ian

  On 4 Jul 2014, at 16:32, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 You should be able to use DynamoDBInputFormat (I think this should be
 part of AWS libraries for Java) and create a HadoopRDD from that.


 On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote:

 Hi,

 I noticed mention of DynamoDB as input source in

 http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf
 .

 Unfortunately, Google is not coming to my rescue on finding
 further mention for this support.

 Any pointers would be well received.

 Big thanks,
 ian









Re: taking top k values of rdd

2014-07-05 Thread Nick Pentreath
To make it efficient in your case you may need to do a bit of custom code to 
emit the top k per partition and then only send those to the driver. On the 
driver you can just top k the combined top k from each partition (assuming you 
have (object, count) for each top k list).

—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote:

 my initial approach to taking top k values of a rdd was using a
 priority-queue monoid. along these lines:
 rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) },
 false).reduce(monoid.plus)
 this works fine, but looking at the code for reduce it first reduces within
 a partition (which doesnt help me) and then sends the results to the driver
 where these again get reduced. this means that for every partition the
 (potentially very bulky) priorityqueue gets shipped to the driver.
 my driver is client side, not inside cluster, and i cannot change this, so
 this shipping to driver of all these queues can be expensive.
 is there a better way to do this? should i try to a shuffle first to reduce
 the partitions to the minimal amount (since number of queues shipped is
 equal to number of partitions)?
 is was a way to reduce to a single item RDD, so the queues stay inside
 cluster and i can retrieve the final result with RDD.first?

Re: taking top k values of rdd

2014-07-05 Thread Nick Pentreath
Right. That is unavoidable unless as you say you repartition into 1 partition, 
which may do the trick.


When I say send the top k per partition I don't mean send the pq but the actual 
values. This may end up being relatively small if k and p are not too big. (I'm 
not sure how large serialized pq is).
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers ko...@tresata.com wrote:

 hey nick,
 you are right. i didnt explain myself well and my code example was wrong...
 i am keeping a priority-queue with k items per partition (using
 com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
 of the queues).
 but this still means i am sending k items per partition to my driver, so k
 x p, while i only need k.
 thanks! koert
 On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:
 To make it efficient in your case you may need to do a bit of custom code
 to emit the top k per partition and then only send those to the driver. On
 the driver you can just top k the combined top k from each partition
 (assuming you have (object, count) for each top k list).

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote:

 my initial approach to taking top k values of a rdd was using a
 priority-queue monoid. along these lines:

  rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) },
 false).reduce(monoid.plus)

 this works fine, but looking at the code for reduce it first reduces
 within a partition (which doesnt help me) and then sends the results to the
 driver where these again get reduced. this means that for every partition
 the (potentially very bulky) priorityqueue gets shipped to the driver.

 my driver is client side, not inside cluster, and i cannot change this,
 so this shipping to driver of all these queues can be expensive.

 is there a better way to do this? should i try to a shuffle first to
 reduce the partitions to the minimal amount (since number of queues shipped
 is equal to number of partitions)?

 is was a way to reduce to a single item RDD, so the queues stay inside
 cluster and i can retrieve the final result with RDD.first?




Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Nick Pentreath
For linear models the 3rd option is by far most efficient and I suspect what 
Evan is alluding to. 


Unfortunately it's not directly possible with the classes in Mllib now so 
you'll have to roll your own using underlying sgd / bfgs primitives.
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen c...@adatao.com
wrote:

 Hi sparkuser2345,
 I'm inferring the problem statement is something like how do I make this
 complete faster (given my compute resources)?
 Several comments.
 First, Spark only allows launching parallel tasks from the driver, not from
 workers, which is why you're seeing the exception when you try. Whether the
 latter is a sensible/doable idea is another discussion, but I can
 appreciate why many people assume this should be possible.
 Second, on optimization, you may be able to apply Sean's idea about
 (thread) parallelism at the driver, combined with the knowledge that often
 these cluster tasks bottleneck while competing for the same resources at
 the same time (cpu vs disk vs network, etc.) You may be able to achieve
 some performance optimization by randomizing these timings. This is not
 unlike GMail randomizing user storage locations around the world for load
 balancing. Here, you would partition each of your RDDs into a different
 number of partitions, making some tasks larger than others, and thus some
 may be in cpu-intensive map while others are shuffling data around the
 network. This is rather cluster-specific; I'd be interested in what you
 learn from such an exercise.
 Third, I find it useful always to consider doing as much as possible in one
 pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
 minimizing map/shuffle/reduce boundaries with their context switches and
 data shuffling. In this case, notice how you're running the
 training+prediction k times over mostly the same rows, with map/reduce
 boundaries in between. While the training phase is sealed in this context,
 you may be able to improve performance by collecting all the k models
 together, and do a [m x k] predictions all at once which may end up being
 faster.
 Finally, as implied from the above, for the very common k-fold
 cross-validation pattern, the algorithm itself might be written to be smart
 enough to take both train and test data and do the right thing within
 itself, thus obviating the need for the user to prepare k data sets and
 running over them serially, and likely saving a lot of repeated
 computations in the right internal places.
 Enjoy,
 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen
 On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote:
 If you call .par on data_kfolded it will become a parallel collection in
 Scala and so the maps will happen in parallel .
 On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote:

 Hi,

 I am trying to fit a logistic regression model with cross validation in
 Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
 each element is a pair of RDDs containing the training and test data:

 (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
 test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

 scala data_kfolded
 res21:

 Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
 =
 Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at
 console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map
 at
 console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map
 at
 console:23))

 Everything works fine when using data_kfolded:

 val validationErrors =
 data_kfolded.map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1)
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }

 scala validationErrors
 res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
 0.29833546734955185)

 However, I have understood that the models are not fitted in parallel as
 data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
 running the same code where data_kfolded has been replaced with
 sc.parallelize(data_kfolded), I get a null pointer exception from the line
 where the run method of the SVMWithSGD object is called with the traning
 data. I guess this is somehow related to the fact that RDDs can't be
 accessed from inside a closure. I fail to understand though why the first
 version works and the second doesn't. Most importantly, is there a way to
 fit the models in parallel? I would really appreciate your help.

 val validationErrors =
 sc.parallelize(data_kfolded).map { datafold =
   val svmAlg = new SVMWithSGD()
   val 

Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Nick Pentreath
You may look into the new Azkaban - which while being quite heavyweight is 
actually quite pleasant to use when set up.


You can run spark jobs (spark-submit) using azkaban shell commands and pass 
paremeters between jobs. It supports dependencies, simple dags and scheduling 
with retries. 




I'm digging deeper and it may be worthwhile extending it with a Spark job 
type...




It's probably best for mixed Hadoop / Spark clusters...
—
Sent from Mailbox

On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com
wrote:

 I used both - Oozie and Luigi - but found them inflexible and still
 overcomplicated, especially in presence of Spark.
 Oozie has a fixed list of building blocks, which is pretty limiting. For
 example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are
 out of scope (of course, you can always write wrapper as Java or Shell
 action, but does it really need to be so complicated?). Another issue with
 Oozie is passing variables between actions. There's Oozie context that is
 suitable for passing key-value pairs (both strings) between actions, but
 for more complex objects (say, FileInputStream that should be closed at
 last step only) you have to do some advanced kung fu.
 Luigi, on other hand, has its niche - complicated dataflows with many tasks
 that depend on each other. Basically, there are tasks (this is where you
 define computations) and targets (something that can exist - file on
 disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it
 creates a plan for achieving this. Luigi is really shiny when your workflow
 fits this model, but one step away and you are in trouble. For example,
 consider simple pipeline: run MR job and output temporary data, run another
 MR job and output final data, clean temporary data. You can make target
 Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1,
 right? Not so easy. How do you check that Clean task is achieved? If you
 just test whether temporary directory is empty or not, you catch both cases
 - when all tasks are done and when they are not even started yet. Luigi
 allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single
 run() method, but ruins the entire idea.
 And of course, both of these frameworks are optimized for standard
 MapReduce jobs, which is probably not what you want on Spark mailing list
 :)
 Experience with these frameworks, however, gave me some insights about
 typical data pipelines.
 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks
 allow branching, but most pipelines actually consist of moving data from
 source to destination with possibly some transformations in between (I'll
 be glad if somebody share use cases when you really need branching).
 2. Transactional logic is important. Either everything, or nothing.
 Otherwise it's really easy to get into inconsistent state.
 3. Extensibility is important. You never know what will need in a week or
 two.
 So eventually I decided that it is much easier to create your own pipeline
 instead of trying to adopt your code to existing frameworks. My latest
 pipeline incarnation simply consists of a list of steps that are started
 sequentially. Each step is a class with at least these methods:
  * run() - launch this step
  * fail() - what to do if step fails
  * finalize() - (optional) what to do when all steps are done
 For example, if you want to add possibility to run Spark jobs, you just
 create SparkStep and configure it with required code. If you want Hive
 query - just create HiveStep and configure it with Hive connection
 settings. I use YAML file to configure steps and Context (basically,
 Map[String, Any]) to pass variables between them. I also use configurable
 Reporter available for all steps to report the progress.
 Hopefully, this will give you some insights about best pipeline for your
 specific case.
 On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote:

 We use Luigi for this purpose.  (Our pipelines are typically on AWS (no
 EMR) backed by S3 and using combinations of Python jobs, non-Spark
 Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to
 the master, and those are what is invoked from Luigi.)

 —
 p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


 On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote:

 I'm just wondering what's the general recommendation for data pipeline
 automation.

 Say, I want to run Spark Job A, then B, then invoke script C, then do D,
 and
 if D fails, do E, and if Job A fails, send email F, etc...

 It looks like Oozie might be the best choice. But I'd like some
 advice/suggestions.

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Nick Pentreath
Did you use old azkaban or azkaban 2.5? It has been completely rewritten.

Not saying it is the best but I found it way better than oozie for example.

Sent from my iPhone

 On 11 Jul 2014, at 09:24, 明风 mingf...@taobao.com wrote:
 
 We use Azkaban for a short time and suffer a lot. Finally we almost rewrite 
 it totally. Don’t recommend it really.
 
 发件人: Nick Pentreath nick.pentre...@gmail.com
 答复: user@spark.apache.org
 日期: 2014年7月11日 星期五 下午3:18
 至: user@spark.apache.org
 主题: Re: Recommended pipeline automation tool? Oozie?
 
 You may look into the new Azkaban - which while being quite heavyweight is 
 actually quite pleasant to use when set up.
 
 You can run spark jobs (spark-submit) using azkaban shell commands and pass 
 paremeters between jobs. It supports dependencies, simple dags and scheduling 
 with retries. 
 
 I'm digging deeper and it may be worthwhile extending it with a Spark job 
 type...
 
 It's probably best for mixed Hadoop / Spark clusters...
 —
 Sent from Mailbox
 
 
 On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote:
 I used both - Oozie and Luigi - but found them inflexible and still 
 overcomplicated, especially in presence of Spark. 
 
 Oozie has a fixed list of building blocks, which is pretty limiting. For 
 example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out 
 of scope (of course, you can always write wrapper as Java or Shell action, 
 but does it really need to be so complicated?). Another issue with Oozie is 
 passing variables between actions. There's Oozie context that is suitable 
 for passing key-value pairs (both strings) between actions, but for more 
 complex objects (say, FileInputStream that should be closed at last step 
 only) you have to do some advanced kung fu. 
 
 Luigi, on other hand, has its niche - complicated dataflows with many tasks 
 that depend on each other. Basically, there are tasks (this is where you 
 define computations) and targets (something that can exist - file on disk, 
 entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates 
 a plan for achieving this. Luigi is really shiny when your workflow fits 
 this model, but one step away and you are in trouble. For example, consider 
 simple pipeline: run MR job and output temporary data, run another MR job 
 and output final data, clean temporary data. You can make target Clean, that 
 depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so 
 easy. How do you check that Clean task is achieved? If you just test whether 
 temporary directory is empty or not, you catch both cases - when all tasks 
 are done and when they are not even started yet. Luigi allows you to specify 
 all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but 
 ruins the entire idea. 
 
 And of course, both of these frameworks are optimized for standard MapReduce 
 jobs, which is probably not what you want on Spark mailing list :) 
 
 Experience with these frameworks, however, gave me some insights about 
 typical data pipelines. 
 
 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks 
 allow branching, but most pipelines actually consist of moving data from 
 source to destination with possibly some transformations in between (I'll be 
 glad if somebody share use cases when you really need branching). 
 2. Transactional logic is important. Either everything, or nothing. 
 Otherwise it's really easy to get into inconsistent state. 
 3. Extensibility is important. You never know what will need in a week or 
 two. 
 
 So eventually I decided that it is much easier to create your own pipeline 
 instead of trying to adopt your code to existing frameworks. My latest 
 pipeline incarnation simply consists of a list of steps that are started 
 sequentially. Each step is a class with at least these methods: 
 
  * run() - launch this step
  * fail() - what to do if step fails
  * finalize() - (optional) what to do when all steps are done
 
 For example, if you want to add possibility to run Spark jobs, you just 
 create SparkStep and configure it with required code. If you want Hive query 
 - just create HiveStep and configure it with Hive connection settings. I use 
 YAML file to configure steps and Context (basically, Map[String, Any]) to 
 pass variables between them. I also use configurable Reporter available for 
 all steps to report the progress. 
 
 Hopefully, this will give you some insights about best pipeline for your 
 specific case. 
 
 
 
 On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote:
 
 We use Luigi for this purpose.  (Our pipelines are typically on AWS (no 
 EMR) backed by S3 and using combinations of Python jobs, non-Spark 
 Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to 
 the master, and those are what is invoked from Luigi.)
 
 —
 p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
 
 
 On Thu, Jul 10, 2014 at 10:20 AM, k.tham

Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-15 Thread Nick Pentreath
You could try the following: create a minimal project using sbt or Maven,
add spark-streaming-twitter as a dependency, run sbt assembly (or mvn
package) on that to create a fat jar (with Spark as provided dependency),
and add that to the shell classpath when starting up.


On Tue, Jul 15, 2014 at 9:06 AM, Praveen Seluka psel...@qubole.com wrote:

 If you want to make Twitter* classes available in your shell, I believe
 you could do the following
 1. Change the parent pom module ordering - Move external/twitter before
 assembly
 2. In assembly/pom.xm, add external/twitter dependency - this will package
 twitter* into the assembly jar

 Now when spark-shell is launched, assembly jar is in classpath - hence
 twitter* too. I think this will work (remember trying this sometime back)


 On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Hmm, I'd like to clarify something from your comments, Tathagata.

 Going forward, is Twitter Streaming functionality not supported from the
 shell? What should users do if they'd like to process live Tweets from the
 shell?

 Nick


 On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 At some point, you were able to access TwitterUtils from spark shell
 using Spark 1.0.0+ ?


 Yep.


 If yes, then what change in Spark caused it to not work any more?


 It still works for me. I was just commenting on your remark that it
 doesn't work through the shell, which I now understand to apply to versions
 of Spark before 1.0.0.

  Nick






Re: Count distinct with groupBy usage

2014-07-15 Thread Nick Pentreath
You can use .distinct.count on your user RDD.


What are you trying to achieve with the time group by?
—
Sent from Mailbox

On Tue, Jul 15, 2014 at 8:14 PM, buntu buntu...@gmail.com wrote:

 Hi --
 New to Spark and trying to figure out how to do a generate unique counts per
 page by date given this raw data:
 timestamp,page,userId
 1405377264,google,user1
 1405378589,google,user2
 1405380012,yahoo,user1
 ..
 I can do a groupBy a field and get the count:
 val lines=sc.textFile(data.csv)
 val csv=lines.map(_.split(,))
 // group by page
 csv.groupBy(_(1)).count
 But not able to see how to do count distinct on userId and also apply
 another groupBy on timestamp field. Please let me know how to handle such
 cases. 
 Thanks!
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Large scale ranked recommendation

2014-07-18 Thread Nick Pentreath
It is very true that making predictions in batch for all 1 million users
against the 10k items will be quite onerous in terms of computation. I have
run into this issue too in making batch predictions.

Some ideas:

1. Do you really need to generate recommendations for each user in batch?
How are you serving these recommendations? In most cases, you only need to
make recs when a user is actively interacting with your service or product
etc. Doing it all in batch tends to be a big waste of computation resources.

In our system for example we are serving them in real time (as a user
arrives at a web page, say, our customer hits our API for recs), so we only
generate the rec at that time. You can take a look at Oryx for this (
https://github.com/cloudera/oryx) though it does not yet support Spark, you
may be able to save the model into the correct format in HDFS and have Oryx
read the data.

2. If you do need to make the recs in batch, then I would suggest:
(a) because you have few items, I would collect the item vectors and form a
matrix.
(b) broadcast that matrix
(c) do a mapPartitions on the user vectors. Form a user matrix from the
vectors in each partition (maybe create quite a few partitions to make each
user matrix not too big)
(d) do a value call on the broadcasted item matrix
(e) now for each partition you have the (small) item matrix and a (larger)
user matrix. Do a matrix multiply and you end up with a (U x I) matrix with
the scores for each user in the partition. Because you are using BLAS here,
it will be significantly faster than individually computed dot products
(f) sort the scores for each user and take top K
(g) save or collect and do whatever with the scores

3. in conjunction with (2) you can try throwing more resources at the
problem too

If you access the underlying Breeze vectors (I think the toBreeze method is
private so you may have to re-implement it), you can do all this using
Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot).

Hope that helps

Nick


On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote:

 Yes, thats what prediction should be doing, taking dot products or sigmoid
 function for each user,item pair. For 1 million users and 10 K items data
 there are 10 billion pairs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Large scale ranked recommendation

2014-07-18 Thread Nick Pentreath
Agree GPUs may be interesting for this kind of massively parallel linear
algebra on reasonable size vectors.

These projects might be of interest in this regard:
https://github.com/BIDData/BIDMach
https://github.com/BIDData/BIDMat
https://github.com/dlwh/gust

Nick



On Fri, Jul 18, 2014 at 7:40 PM, m3.sharma sharm...@umn.edu wrote:

 Thanks Nick real-time suggestion is good, will see if we can add that to
 our
 deployment strategy and you are correct we may not need recommendation for
 each user.

 Will try adding more resources and broadcasting item features suggestion as
 currently they don't seem to be huge.

 As users and items both will continue to grow in future for faster vector
 computations I think few GPU nodes will suffice to serve faster
 recommendation after learning model with SPARK. It will be great to have
 builtin GPU support in SPARK for faster computations to leverage GPU
 capability of nodes for performing these flops faster.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: NullPointerException When Reading Avro Sequence Files

2014-07-19 Thread Nick Pentreath
I got this working locally a little while ago when playing around with
AvroKeyInputFile: https://gist.github.com/MLnick/5864741781b9340cb211

But not sure about AvroSequenceFile. Any chance you have an example
datafile or records?



On Sat, Jul 19, 2014 at 11:00 AM, Sparky gullo_tho...@bah.com wrote:

 To be more specific, I'm working with a system that stores data in
 org.apache.avro.hadoop.io.AvroSequenceFile format.  An AvroSequenceFile is
 A wrapper around a Hadoop SequenceFile that also supports reading and
 writing Avro data.

 It seems that Spark does not support this out of the box.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10234.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark clustered client

2014-07-23 Thread Nick Pentreath
At the moment your best bet for sharing SparkContexts across jobs will be 
Ooyala job server: https://github.com/ooyala/spark-jobserver


It doesn't yet support spark 1.0 though I did manage to amend it to get it to 
build and run on 1.0
—
Sent from Mailbox

On Wed, Jul 23, 2014 at 1:21 AM, Asaf Lahav asaf.la...@gmail.com wrote:

 Hi Folks,
 I have been trying to dig up some information in regards to what are the
 possibilities when wanting to deploy more than one client process that
 consumes Spark.
 Let's say I have a Spark Cluster of 10 servers, and would like to setup 2
 additional servers which are sending requests to it through a Spark
 context, referencing one specific file of 1TB of data.
 Each client process, has its own SparkContext instance.
 Currently, the result is that that same file is loaded into memory twice
 because the Spark Context resources are not shared between processes/jvms.
 I wouldn't like to have that same file loaded over and over again with
 every new client being introduced.
 What would be the best practice here? Am I missing something?
 Thank you,
 Asaf

Re: Workarounds for accessing sequence file data via PySpark?

2014-07-23 Thread Nick Pentreath
Load from sequenceFile for PySpark is in master and save is in this PR
underway (https://github.com/apache/spark/pull/1338)

I hope that Kan will have it ready to merge in time for 1.1 release window
(it should be, the PR just needs a final review or two).

In the meantime you can check out master and test out the sequenceFile load
support in PySpark (there are examples in the /examples project and in
python test, and some documentation in /docs)


On Wed, Jul 23, 2014 at 4:42 PM, Gary Malouf malouf.g...@gmail.com wrote:

 I am aware that today PySpark can not load sequence files directly.  Are
 there work-arounds people are using (short of duplicating all the data to
 text files) for accessing this data?



Re: iScala or Scala-notebook

2014-07-29 Thread Nick Pentreath
IScala itself seems to be a bit dead unfortunately.

I did come across this today: https://github.com/tribbloid/ISpark


On Fri, Jul 18, 2014 at 4:59 AM, ericjohnston1989 
ericjohnston1...@gmail.com wrote:

 Hey everyone,

 I know this was asked before but I'm wondering if there have since been any
 updates. Are there any plans to integrate iScala/Scala-notebook with spark
 in the near future?

 This seems like something a lot of people would find very useful, so I was
 just wondering if anyone has started working on it.

 Thanks,

 Eric



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/iScala-or-Scala-notebook-tp10127.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: zip two RDD in pyspark

2014-07-30 Thread Nick Pentreath
parallelize uses the default Serializer (PickleSerializer) while textFile
uses UTF8Serializer.

You can get around this with index.zip(input_data._reserialize())  (or
index.zip(input_data.map(lambda x: x)))

(But if you try to just do this, you run into the issue with different
number of partitions):

index.zip(input_data._reserialize()).count()

Py4JJavaError: An error occurred while calling o60.collect.

: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers
of partitions

at org.apache.spark.rdd.ZippedRDD.getPartitions(ZippedRDD.scala:55)






On Wed, Jul 30, 2014 at 7:53 AM, Davies Liu dav...@databricks.com wrote:

 On Mon, Jul 28, 2014 at 12:58 PM, l lishu...@gmail.com wrote:
  I have a file in s3 that I want to map each line with an index. Here is
 my
  code:
 
  input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
  N input_data.count()
  index = sc.parallelize(range(N), 6)
  index.zip(input_data).collect()

 I think you can not do zipWithIndex() in this way, because the number of
 lines in each partition of input_data will be different than index. You
 need
 get the exact number of lines for each partitions first, then generate
 correct
 index. It will be easy to do with mapPartitions()

  nums = input_data.mapPartitions(lambda it: [sum(1 for i in
 it)]).collect()
  starts = [sum(nums[:i]) for i in range(len(nums))]
  zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j,
 x) for j,x in enumerate(it)))

 
  ...
  14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
  14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1)
  finished in 0.031 s
  14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1,
  took 0.03707 s
  Traceback (most recent call last):
File stdin, line 1, in module
File /root/spark/python/pyspark/rdd.py, line 584, in collect
  return list(self._collect_iterator_through_file(bytesInJava))
File /root/spark/python/pyspark/rdd.py, line 592, in
  _collect_iterator_through_file
  self.ctx._writeToFile(iterator, tempFile.name)
File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 537, in __call__
File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
  300, in get_return_value
  py4j.protocol.Py4JJavaError: An error occurred while calling
  z:org.apache.spark.api.python.PythonRDD.writeToFile.
  : java.lang.ClassCastException: java.lang.String cannot be cast to [B
  at
 
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
  at
 
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
 
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
  at
 org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
  at
 org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
  at
 org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
  at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
  at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
  at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
  at py4j.Gateway.invoke(Gateway.java:259)
  at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:207)
  at java.lang.Thread.run(Thread.java:744)

  As I see it, the job is completed, but I don't understand what's
 happening
  to 'String cannot be cast to [B'. I tried to zip two
 parallelCollectionRDD
  and it works fine. But here I have a MappedRDD at textFile. Not sure
 what's
  going on here.

 Could you provide an script and dataset to reproduce this error? Maybe
 there are some corner cases during serialization.


  Also, why Python does not have ZipWithIndex()?

 The features in PySpark are much less than Spark, hopefully it will
 catch up in next two releases.

 
  Thanks for any help.
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass with spark-submit

2014-08-07 Thread Nick Pentreath
I'm also getting this - Ryan we both seem to be running into this issue
with elasticsearch-hadoop :)

I tried spark.files.userClassPathFirst true on command line and that
doesn;t work

If I put it that line in spark/conf/spark-defaults it works but now I'm
getting:
java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputFormat

think I may need to add hadoop-client to my assembly, but any other ideas
welcome.

Ryan, will let you know how I get on


On Mon, Aug 4, 2014 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 I'm guessing you have the Jackson classes in your assembly but so does
 Spark. Its classloader wins, and does not contain the class present in
 your app's version of Jackson. Try spark.files.userClassPathFirst ?

 On Mon, Aug 4, 2014 at 6:28 AM, Ryan Braley r...@traintracks.io wrote:
  Hi Folks,
 
   I have an assembly jar that I am submitting using spark-submit script
 on a
  cluster I created with the spark-ec2 script. I keep running into the
  java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
  error on my workers even though jar tf clearly shows that class being a
 part
  of my assembly jar. I have the spark program working locally.
  Here is the error log:
  https://gist.github.com/rbraley/cf5cd3457a89b1c0ac88
 
  Anybody have any suggestions of things I can try? It seems
 
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3c1403899110.65393.yahoomail...@web160503.mail.bf1.yahoo.com%3E
  that this is a similar error. I am open to recompiling spark to fix this,
  but I would like to run my job on my cluster rather than just locally.
 
  Thanks,
  Ryan
 
 
  Ryan Braley  |  Founder
  http://traintracks.io/
 
  US: +1 (206) 866 5661
  CN: +86 156 1153 7598
  Coding the future. Decoding the game.
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass with spark-submit

2014-08-08 Thread Nick Pentreath
By the way, for anyone using elasticsearch-hadoop, there is a fix for this
here: https://github.com/elasticsearch/elasticsearch-hadoop/issues/239

Ryan - using the nightly snapshot build of 2.1.0.BUILD-SNAPSHOT fixed this
for me.


On Thu, Aug 7, 2014 at 3:58 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 I'm also getting this - Ryan we both seem to be running into this issue
 with elasticsearch-hadoop :)

 I tried spark.files.userClassPathFirst true on command line and that
 doesn;t work

 If I put it that line in spark/conf/spark-defaults it works but now I'm
 getting:
 java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputFormat

 think I may need to add hadoop-client to my assembly, but any other ideas
 welcome.

 Ryan, will let you know how I get on


 On Mon, Aug 4, 2014 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 I'm guessing you have the Jackson classes in your assembly but so does
 Spark. Its classloader wins, and does not contain the class present in
 your app's version of Jackson. Try spark.files.userClassPathFirst ?

 On Mon, Aug 4, 2014 at 6:28 AM, Ryan Braley r...@traintracks.io wrote:
  Hi Folks,
 
   I have an assembly jar that I am submitting using spark-submit script
 on a
  cluster I created with the spark-ec2 script. I keep running into the
  java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
  error on my workers even though jar tf clearly shows that class being a
 part
  of my assembly jar. I have the spark program working locally.
  Here is the error log:
  https://gist.github.com/rbraley/cf5cd3457a89b1c0ac88
 
  Anybody have any suggestions of things I can try? It seems
 
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3c1403899110.65393.yahoomail...@web160503.mail.bf1.yahoo.com%3E
  that this is a similar error. I am open to recompiling spark to fix
 this,
  but I would like to run my job on my cluster rather than just locally.
 
  Thanks,
  Ryan
 
 
  Ryan Braley  |  Founder
  http://traintracks.io/
 
  US: +1 (206) 866 5661
  CN: +86 156 1153 7598
  Coding the future. Decoding the game.
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Failed running Spark ALS

2014-09-19 Thread Nick Pentreath
Have you set spark.local.dir (I think this is the config setting)?

It needs to point to a volume with plenty of space.

By default if I recall it point to /tmp

Sent from my iPhone

 On 19 Sep 2014, at 23:35, jw.cmu jinliangw...@gmail.com wrote:
 
 I'm trying to run Spark ALS using the netflix dataset but failed due to No
 space on device exception. It seems the exception is thrown after the
 training phase. It's not clear to me what is being written and where is the
 output directory.
 
 I was able to run the same code on the provided test.data dataset.
 
 I'm new to Spark and I'd like to get some hints for resolving this problem.
 
 The code I ran was got from
 https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html (the
 Java version).
 
 Relevant info:
 
 Spark version: 1.0.2 (Standalone deployment)
 # slaves/workers/exectuors: 8
 Core per worker: 64
 memory per executor: 100g
 
 Application parameters are left as default.
 
 
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Failed-running-Spark-ALS-tp14704.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.1.0 - hbase 0.98.6-hadoop2 version - py4j.protocol.Py4JJavaError java.lang.ClassNotFoundException

2014-10-04 Thread Nick Pentreath
forgot to copy user list

On Sat, Oct 4, 2014 at 3:12 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 what version did you put in the pom.xml?

 it does seem to be in Maven central:
 http://search.maven.org/#artifactdetails%7Corg.apache.hbase%7Chbase%7C0.98.6-hadoop2%7Cpom

 dependency
 groupIdorg.apache.hbase/groupId
 artifactIdhbase/artifactId
 version0.98.6-hadoop2/version
 /dependency

 Note you shouldn't need to rebuild Spark, I think just the example project
 via sbt examples/assembly

 On Fri, Oct 3, 2014 at 10:55 AM, serkan.dogan foreignerdr...@yahoo.com
 wrote:

 Hi,
 I installed hbase-0.98.6-hadoop2. It's working not any problem with that.


 When i am try to run spark hbase  python examples, (wordcount examples
 working - not python issue)

  ./bin/spark-submit  --master local --driver-class-path
 ./examples/target/spark-examples_2.10-1.1.0.jar
 ./examples/src/main/python/hbase_inputformat.py localhost myhbasetable

 the process exit with ClassNotFoundException...

 I search lots of blogs, sites all says spark 1.1 version built with hbase
 0.94.6 rebuild with own hbase version.

 I try first,
 change hbase version number - in pom.xml  -- nothing found maven central

 I try second,
 compile hbase from src and copy hbase/lib folder hbase jars to
 spark/lib_managed folder and edit spark-defaults.conf

 my spark-defaults.conf

 spark.executor.extraClassPath

 /home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-server-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-protocol-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-hadoop2-compat-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-client-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-commont-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/htrace-core-2.04.jar


 My question is how i can work with hbase 0.98.6-hadoop2 with spark 1.1.0

 Here is the exception message


 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 14/10/03 11:27:15 WARN Utils: Your hostname, xxx.yyy.com resolves to a
 loopback address: 127.0.0.1; using 1.1.1.1 instead (on interface eth0)
 14/10/03 11:27:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
 another address
 14/10/03 11:27:15 INFO SecurityManager: Changing view acls to: root,
 14/10/03 11:27:15 INFO SecurityManager: Changing modify acls to: root,
 14/10/03 11:27:15 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(root, );
 users
 with modify permissions: Set(root, )
 14/10/03 11:27:16 INFO Slf4jLogger: Slf4jLogger started
 14/10/03 11:27:16 INFO Remoting: Starting remoting
 14/10/03 11:27:16 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkdri...@1-1-1-1-1.rev.mydomain.io:49256]
 14/10/03 11:27:16 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkdri...@1-1-1-1-1.rev.mydomain.io:49256]
 14/10/03 11:27:16 INFO Utils: Successfully started service 'sparkDriver'
 on
 port 49256.
 14/10/03 11:27:16 INFO SparkEnv: Registering MapOutputTracker
 14/10/03 11:27:16 INFO SparkEnv: Registering BlockManagerMaster
 14/10/03 11:27:16 INFO DiskBlockManager: Created local directory at
 /tmp/spark-local-20141003112716-298d
 14/10/03 11:27:16 INFO Utils: Successfully started service 'Connection
 manager for block manager' on port 35106.
 14/10/03 11:27:16 INFO ConnectionManager: Bound socket to port 35106 with
 id
 = ConnectionManagerId(1-1-1-1-1.rev.mydomain.io,35106)
 14/10/03 11:27:16 INFO MemoryStore: MemoryStore started with capacity
 267.3
 MB
 14/10/03 11:27:16 INFO BlockManagerMaster: Trying to register BlockManager
 14/10/03 11:27:16 INFO BlockManagerMasterActor: Registering block manager
 1-1-1-1-1.rev.mydomain.io:35106 with 267.3 MB RAM
 14/10/03 11:27:16 INFO BlockManagerMaster: Registered BlockManager
 14/10/03 11:27:16 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-f60b0533-998f-4af2-a208-d04c571eab82
 14/10/03 11:27:16 INFO HttpServer: Starting HTTP Server
 14/10/03 11:27:16 INFO Utils: Successfully started service 'HTTP file
 server' on port 49611.
 14/10/03 11:27:16 INFO Utils: Successfully started service 'SparkUI' on
 port
 4040.
 14/10/03 11:27:16 INFO SparkUI: Started SparkUI at
 http://1-1-1-1-1.rev.mydomain.io:4040
 14/10/03 11:27:16 INFO Utils: Copying

 /home/downloads/spark/spark-1.1.0/./examples/src/main/python/hbase_inputformat.py
 to /tmp/spark-7232227a-0547-454e-9f68-805fa7b0c2f0/hbase_inputformat.py
 14/10/03 11:27:16 INFO SparkContext: Added file

 file:/home/downloads/spark/spark-1.1.0/./examples/src/main/python/hbase_inputformat.py
 at http://1.1.1.1:49611/files/hbase_inputformat.py with timestamp
 1412324836837
 14/10/03 11:27:16 INFO AkkaUtils: Connecting to HeartbeatReceiver:
 akka.tcp://
 sparkdri...@1-1-1-1-1.rev.mydomain.io:49256/user/HeartbeatReceiver
 Traceback (most

Re: word2vec: how to save an mllib model and reload it?

2014-11-07 Thread Nick Pentreath
Currently I see the word2vec model is collected onto the master, so the model 
itself is not distributed. 


I guess the question is why do you need  a distributed model? Is the vocab size 
so large that it's necessary? For model serving in general, unless the model is 
truly massive (ie cannot fit into memory on a modern high end box with 64, or 
128GB ram) then single instance is way faster and simpler (using a cluster of 
machines is more for load balancing / fault tolerance).




What is your use case for model serving?


—
Sent from Mailbox

On Fri, Nov 7, 2014 at 5:47 PM, Duy Huynh duy.huynh@gmail.com wrote:

 you're right, serialization works.
 what is your suggestion on saving a distributed model?  so part of the
 model is in one cluster, and some other parts of the model are in other
 clusters.  during runtime, these sub-models run independently in their own
 clusters (load, train, save).  and at some point during run time these
 sub-models merge into the master model, which also loads, trains, and saves
 at the master level.
 much appreciated.
 On Fri, Nov 7, 2014 at 2:53 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:
 There's some work going on to support PMML -
 https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been
 merged into master.

 What are you used to doing in other environments? In R I'm used to running
 save(), same with matlab. In python either pickling things or dumping to
 json seems pretty common. (even the scikit-learn docs recommend pickling -
 http://scikit-learn.org/stable/modules/model_persistence.html). These all
 seem basically equivalent java serialization to me..

 Would some helper functions (in, say, mllib.util.modelpersistence or
 something) make sense to add?

 On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com
 wrote:

 that works.  is there a better way in spark?  this seems like the most
 common feature for any machine learning work - to be able to save your
 model after training it and load it later.

 On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 Plain old java serialization is one straightforward approach if you're
 in java/scala.

 On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote:

 what is the best way to save an mllib model that you just trained and
 reload
 it in the future?  specifically, i'm using the mllib word2vec model...
 thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: word2vec: how to save an mllib model and reload it?

2014-11-07 Thread Nick Pentreath
For ALS if you want real time recs (and usually this is order 10s to a few 100s 
ms response), then Spark is not the way to go - a serving layer like Oryx, or 
prediction.io is what you want.


(At graphflow we've built our own).




You hold the factor matrices in memory and do the dot product in real time 
(with optional caching). Again, even for huge models (10s of millions 
users/items) this can be handled on a single, powerful instance. The issue at 
this scale is winnowing down the search space using LSH or similar approach to 
get to real time speeds.




For word2vec it's pretty much the same thing as what you have is very similar 
to one of the ALS factor matrices.




One problem is you can't access the wors2vec vectors as they are private val. I 
think this should be changed actually, so that just the word vectors could be 
saved and used in a serving layer.


—
Sent from Mailbox

On Fri, Nov 7, 2014 at 7:37 PM, Evan R. Sparks evan.spa...@gmail.com
wrote:

 There are a few examples where this is the case. Let's take ALS, where the
 result is a MatrixFactorizationModel, which is assumed to be big - the
 model consists of two matrices, one (users x k) and one (k x products).
 These are represented as RDDs.
 You can save these RDDs out to disk by doing something like
 model.userFeatures.saveAsObjectFile(...) and
 model.productFeatures.saveAsObjectFile(...)
 to save out to HDFS or Tachyon or S3.
 Then, when you want to reload you'd have to instantiate them into a class
 of MatrixFactorizationModel. That class is package private to MLlib right
 now, so you'd need to copy the logic over to a new class, but that's the
 basic idea.
 That said - using spark to serve these recommendations on a point-by-point
 basis might not be optimal. There's some work going on in the AMPLab to
 address this issue.
 On Fri, Nov 7, 2014 at 7:44 AM, Duy Huynh duy.huynh@gmail.com wrote:
 you're right, serialization works.

 what is your suggestion on saving a distributed model?  so part of the
 model is in one cluster, and some other parts of the model are in other
 clusters.  during runtime, these sub-models run independently in their own
 clusters (load, train, save).  and at some point during run time these
 sub-models merge into the master model, which also loads, trains, and saves
 at the master level.

 much appreciated.



 On Fri, Nov 7, 2014 at 2:53 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 There's some work going on to support PMML -
 https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been
 merged into master.

 What are you used to doing in other environments? In R I'm used to
 running save(), same with matlab. In python either pickling things or
 dumping to json seems pretty common. (even the scikit-learn docs recommend
 pickling - http://scikit-learn.org/stable/modules/model_persistence.html).
 These all seem basically equivalent java serialization to me..

 Would some helper functions (in, say, mllib.util.modelpersistence or
 something) make sense to add?

 On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com
 wrote:

 that works.  is there a better way in spark?  this seems like the most
 common feature for any machine learning work - to be able to save your
 model after training it and load it later.

 On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 Plain old java serialization is one straightforward approach if you're
 in java/scala.

 On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote:

 what is the best way to save an mllib model that you just trained and
 reload
 it in the future?  specifically, i'm using the mllib word2vec model...
 thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: pyspark get column family and qualifier names from hbase table

2014-11-11 Thread Nick Pentreath
Feel free to add that converter as an option in the Spark examples via a PR :)

—
Sent from Mailbox

On Wed, Nov 12, 2014 at 3:27 AM, alaa contact.a...@gmail.com wrote:

 Hey freedafeng, I'm exactly where you are. I want the output to show the
 rowkey and all column qualifiers that correspond to it. How did you write
 HBaseResultToStringConverter to do what you wanted it to do?
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-tp18613p18650.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: RMSE in MovieLensALS increases or stays stable as iterations increase.

2014-11-26 Thread Nick Pentreath
copying user group - I keep replying directly vs reply all :)

On Wed, Nov 26, 2014 at 2:03 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 ALS will be guaranteed to decrease the squared error (therefore RMSE) in
 each iteration, on the *training* set.

 This does not hold for the *test* set / cross validation. You would
 expect the test set RMSE to stabilise as iterations increase, since the
 algorithm converges - but not necessarily to decrease.

 On Wed, Nov 26, 2014 at 1:57 PM, Kostas Kloudas kklou...@gmail.com
 wrote:

 Hi all,

 I am getting familiarized with Mllib and a thing I noticed is that
 running the MovieLensALS
 example on the movieLens dataset for increasing number of iterations does
 not decrease the
 rmse.

 The results for 0.6% training set and 0.4% test are below. For training
 set to 0.8%, the results
 are almost identical. Shouldn’t it be normal to see a decreasing error?
 Especially going from 1 to 5 iterations.

 Running 1 iterations
 Test RMSE for 1 iter. = 1.2452964343277886 (52.75712592704 s).
 Running 5 iterations
 Test RMSE for 5 iter. = 1.3258973764470259 (61.183927666 s).
 Running 9 iterations
 Test RMSE for 9 iter. = 1.3260308117704385 (61.8494887581 s).
 Running 13 iterations
 Test RMSE for 13 iter. = 1.3260310099809915 (73.799510125 s).
 Running 17 iterations
 Test RMSE for 17 iter. = 1.3260310102735398 (77.5651218531 s).
 Running 21 iterations
 Test RMSE for 21 iter. = 1.3260310102739703 (79.607495074 s).
 Running 25 iterations
 Test RMSE for 25 iter. = 1.326031010273971 (88.631776301 s).
 Running 29 iterations
 Test RMSE for 29 iter. = 1.3260310102739712 (101.178383079 s).

 Thanks  a lot,
 Kostas
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: locality sensitive hashing for spark

2014-12-21 Thread Nick Pentreath
Looks interesting thanks for sharing.


Does it support cosine similarity ? I only saw jaccard mentioned from a quick 
glance.


—
Sent from Mailbox

On Mon, Dec 22, 2014 at 4:12 AM, morr0723 michael.d@gmail.com wrote:

 I've pushed out an implementation of locality sensitive hashing for spark.
 LSH has a number of use cases, most prominent being if the features are not
 based in Euclidean space. 
 Code, documentation, and small exemplar dataset is available on github:
 https://github.com/mrsqueeze/spark-hash
 Feel free to pass along any comments or issues.
 Enjoy!
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/locality-sensitive-hashing-for-spark-tp20803.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: SaveAsTextFile to S3 bucket

2015-01-26 Thread Nick Pentreath
Your output folder specifies

rdd.saveAsTextFile(s3n://nexgen-software/dev/output);

So it will try to write to /dev/output which is as expected. If you create
the directory /dev/output upfront in your bucket, and try to save it to
that (empty) directory, what is the behaviour?

On Tue, Jan 27, 2015 at 6:21 AM, Chen, Kevin kevin.c...@neustar.biz wrote:

  Does anyone know if I can save a RDD as a text file to a pre-created
 directory in S3 bucket?

  I have a directory created in S3 bucket: //nexgen-software/dev

  When I tried to save a RDD as text file in this directory:
 rdd.saveAsTextFile(s3n://nexgen-software/dev/output);


  I got following exception at runtime:

 Exception in thread main org.apache.hadoop.fs.s3.S3Exception:
 org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/dev' -
 ResponseCode=403, ResponseMessage=Forbidden


  I have verified /dev has write permission. However, if I grant the
 bucket //nexgen-software write permission, I don't get exception. But the
 output is not created under dev. Rather, a different /dev/output directory
 is created directory in the bucket (//nexgen-software). Is this how
 saveAsTextFile behalves in S3? Is there anyway I can have output created
 under a pre-defied directory.


  Thanks in advance.







Re: Is it possible to do incremental training using ALSModel (MLlib)?

2015-01-07 Thread Nick Pentreath
As I recall Oryx (the old version, and I assume the new one too) provide
something like this:
http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int-

though Sean will be more on top of that than me :)

On Mon, Jan 5, 2015 at 2:17 PM, Wouter Samaey wouter.sam...@storefront.be
wrote:

 One other idea was that I don’t need to re-train the model, but simply
 pass all the current user’s recent ratings (including one’s created after
 the training) to the existing model…

 Is this a valid option?


 
 Wouter Samaey
 Zaakvoerder Storefront BVBA

 Tel: +32 472 72 83 07
 Web: http://storefront.be

 LinkedIn: http://www.linkedin.com/in/woutersamaey

  On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com wrote:
 
  In the first instance, I'm suggesting that ALS in Spark could perhaps
  expose a run() method that accepts a previous
  MatrixFactorizationModel, and uses the product factors from it as the
  initial state instead. If anybody seconds that idea, I'll make a PR.
 
  The second idea is just fold-in:
 
 http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14
 
  Whether you do this or something like SGD, inside or outside Spark,
  depends on your requirements I think.
 
  On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey
  wouter.sam...@storefront.be wrote:
  Do you know a place where I could find a sample or tutorial for this?
 
  I'm still very new at this. And struggling a bit...
 
  Thanks in advance
 
  Wouter
 
  Sent from my iPhone.
 
  On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com wrote:
 
  Yes, it is easy to simply start a new factorization from the current
 model
  solution. It works well. That's more like incremental *batch*
 rebuilding of
  the model. That is not in MLlib but fairly trivial to add.
 
  You can certainly 'fold in' new data to approximately update with one
 new
  datum too, which you can find online. This is not quite the same idea as
  streaming SGD. I'm not sure this fits the RDD model well since it
 entails
  updating one element at a time but mini batch could be reasonable.
 
  On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com wrote:
 
  I was under the impression that ALS wasn't designed for it :- The
 famous
  ebay online recommender uses SGD
  However, you can try using the previous model as starting point, and
  gradually reduce the number of iteration after the model stablize. I
 never
  verify this idea, so you need to at least cross-validate it before
 putting
  into productio
 
  On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be
 
  wrote:
 
  Hi all,
 
  I'm curious about MLlib and if it is possible to do incremental
 training
  on
  the ALSModel.
 
  Usually training is run first, and then you can query. But in my case,
  data
  is collected in real-time and I want the predictions of my ALSModel to
  consider the latest data without complete re-training phase.
 
  I've checked out these resources, but could not find any info on how
 to
  solve this:
 
 https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
 
 
 http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
 
  My question fits in a larger picture where I'm using Prediction IO,
 and
  this
  in turn is based on Spark.
 
  Thanks in advance for any advice!
 
  Wouter
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Did DataFrames break basic SQLContext?

2015-03-18 Thread Nick Pentreath
To answer your first question - yes 1.3.0 did break backward compatibility for 
the change from SchemaRDD - DataFrame. SparkSQL was an alpha component so api 
breaking changes could happen. It is no longer an alpha component as of 1.3.0 
so this will not be the case in future.




Adding toDF should hopefully not be too much of an effort.




For the second point - I also have seen these exceptions when upgrading jobs to 
1.3.0 - but they don't fail my jobs. Not sure what the cause is would be good 
to understand this.









—
Sent from Mailbox

On Wed, Mar 18, 2015 at 5:22 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 I started to play with 1.3.0 and found that there are a lot of breaking
 changes. Previously, I could do the following:
 case class Foo(x: Int)
 val rdd = sc.parallelize(List(Foo(1)))
 import sqlContext._
 rdd.registerTempTable(foo)
 Now, I am not able to directly use my RDD object and have it implicitly
 become a DataFrame. It can be used as a DataFrameHolder, of which I could
 write:
 rdd.toDF.registerTempTable(foo)
 But, that is kind of a pain in comparison. The other problem for me is that
 I keep getting a SQLException:
 java.sql.SQLException: Failed to start database 'metastore_db' with
 class loader  sun.misc.Launcher$AppClassLoader@10393e97, see the next
 exception for details.
 This seems to be a dependency on Hive, when previously (1.2.0) there was no
 such dependency. I can open tickets for these, but wanted to ask here
 firstmaybe I am doing something wrong?
 Thanks,
 Justin
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: Iterative Algorithms with Spark Streaming

2015-03-16 Thread Nick Pentreath
MLlib supports streaming linear models:
http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression
and k-means:
http://spark.apache.org/docs/latest/mllib-clustering.html#k-means

With an iteration parameter of 1, this amounts to mini-batch SGD where the
mini-batch is the Spark Streaming batch.

On Mon, Mar 16, 2015 at 2:57 PM, Alex Minnaar aminn...@verticalscope.com
wrote:

  I wanted to ask a basic question about the types of algorithms that are
 possible to apply to a DStream with Spark streaming.  With Spark it is
 possible to perform iterative computations on RDDs like in the gradient
 descent example


val points = spark.textFile(...).map(parsePoint).cache()
 var w = Vector.random(D) // current separating plane
 for (i - 1 to ITERATIONS) {
   val gradient = points.map(p =
 (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
   ).reduce(_ + _)
   w -= gradient
 }


  which has a global state w that is updated after each iteration and the
 updated value is then used in the next iteration.  My question is whether
 this type of algorithm is possible if the points variable was a DStream
 instead of an RDD?  It seems like you could perform the same map as above
 which would create a gradient DStream and also use updateStateByKey to
 create a DStream for the w variable.  But the problem is that there doesn't
 seem to be a way to reuse the w DStream inside the map.  I don't think that
 it is possible for DStreams to communicate this way.  Am I correct that
 this is not possible with DStreams or am I missing something?


  Note:  The reason I ask this question is that many machine learning
 algorithms are trained by stochastic gradient descent.  sgd is similar to
 the above gradient descent algorithm except each iteration is on a new
 minibatch of data points rather than the same data points for every
 iteration.  It seems like Spark streaming provides a natural way to stream
 in these minibatches (as RDDs) but if it is not able to keep track of an
 updating global state variable then I don't think it Spark streaming can be
 used for sgd.


  Thanks,


  Alex



Re: Software stack for Recommendation engine with spark mlib

2015-03-15 Thread Nick Pentreath
As Sean says, precomputing recommendations is pretty inefficient. Though with 
500k items its easy to get all the item vectors in memory so pre-computing is 
not too bad.




Still, since you plan to serve these via a REST service anyway, computing on 
demand via a serving layer such as Oryx or PredictionIO (or the newly open 
sourced Seldon.io) is a good option. You can also cache the recommendations 
quite aggressively - once you compute a user or item top-K list, just stick the 
result in mem cache / redis / whatever and evict it when you recompute your 
offline model, or every hour or whatever.






—
Sent from Mailbox

On Sun, Mar 15, 2015 at 3:03 PM, Shashidhar Rao
raoshashidhar...@gmail.com wrote:

 Thanks Sean, your suggestions and the links provided are just what I needed
 to start off with.
 On Sun, Mar 15, 2015 at 6:16 PM, Sean Owen so...@cloudera.com wrote:
 I think you're assuming that you will pre-compute recommendations and
 store them in Mongo. That's one way to go, with certain tradeoffs. You
 can precompute offline easily, and serve results at large scale
 easily, but, you are forced to precompute everything -- lots of wasted
 effort, not completely up to date.

 The front-end part of the stack looks right.

 Spark would do the model building; you'd have to write a process to
 score recommendations and store the result. Mahout is the same thing,
 really.

 500K items isn't all that large. Your requirements aren't driven just
 by items though. Number of users and latent features matter too. It
 matters how often you want to build the model too. I'm guessing you
 would get away with a handful of modern machines for a problem this
 size.


 In a way what you describe reminds me of Wibidata, since it built
 recommender-like solutions on top of data and results published to a
 NoSQL store. You might glance at the related OSS project Kiji
 (http://kiji.org/) for ideas about how to manage the schema.

 You should have a look at things like Nick's architecture for
 Graphflow, however it's more concerned with computing recommendation
 on the fly, and describes a shift from an architecture originally
 built around something like a NoSQL store:

 http://spark-summit.org/wp-content/uploads/2014/07/Using-Spark-and-Shark-to-Power-a-Realt-time-Recommendation-and-Customer-Intelligence-Platform-Nick-Pentreath.pdf

 This is also the kind of ground the oryx project is intended to cover,
 something I've worked on personally:
 https://github.com/OryxProject/oryx   -- a layer on and around the
 core model building in Spark + Spark Streaming to provide a whole
 recommender (for example), down to the REST API.

 On Sun, Mar 15, 2015 at 10:45 AM, Shashidhar Rao
 raoshashidhar...@gmail.com wrote:
  Hi,
 
  Can anyone who has developed recommendation engine suggest what could be
 the
  possible software stack for such an application.
 
  I am basically new to recommendation engine , I just found out Mahout and
  Spark Mlib which are available .
  I am thinking the below software stack.
 
  1. The user is going to use Android app.
  2.  Rest Api sent to app server from the android app to get
 recommendations.
  3. Spark Mlib core engine for recommendation engine
  4. MongoDB database backend.
 
  I would like to know more on the cluster configuration( how many nodes
 etc)
  part of spark for calculating the recommendations for 500,000 items. This
  items include products for day care etc.
 
  Other software stack suggestions would also be very useful.It has to run
 on
  multiple vendor machines.
 
  Please suggest.
 
  Thanks
  shashi


Re: Spark Release 1.3.0 DataFrame API

2015-03-14 Thread Nick Pentreath
I've found people.toDF gives you a data frame (roughly equivalent to the 
previous Row RDD),




And you can then call registerTempTable on that DataFrame.




So people.toDF.registerTempTable(people) should work









—
Sent from Mailbox

On Sat, Mar 14, 2015 at 5:33 PM, David Mitchell jdavidmitch...@gmail.com
wrote:

 I am pleased with the release of the DataFrame API.  However, I started
 playing with it, and neither of the two main examples in the documentation
 work: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html
 Specfically:
- Inferring the Schema Using Reflection
- Programmatically Specifying the Schema
 Scala 2.11.6
 Spark 1.3.0 prebuilt for Hadoop 2.4 and later
 *Inferring the Schema Using Reflection*
 scala people.registerTempTable(people)
 console:31: error: value registerTempTable is not a member of
 org.apache.spark
 .rdd.RDD[Person]
   people.registerTempTable(people)
  ^
 *Programmatically Specifying the Schema*
 scala val peopleDataFrame = sqlContext.createDataFrame(people, schema)
 console:41: error: overloaded method value createDataFrame with
 alternatives:
   (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass:
 Class[_])org.apache.spar
 k.sql.DataFrame and
   (rdd: org.apache.spark.rdd.RDD[_],beanClass:
 Class[_])org.apache.spark.sql.Dat
 aFrame and
   (rowRDD:
 org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],columns:
 java.util.List[String])org.apache.spark.sql.DataFrame and
   (rowRDD:
 org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: o
 rg.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame and
   (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema:
 org.apache
 .spark.sql.types.StructType)org.apache.spark.sql.DataFrame
  cannot be applied to (org.apache.spark.rdd.RDD[String],
 org.apache.spark.sql.ty
 pes.StructType)
val df = sqlContext.createDataFrame(people, schema)
 Any help would be appreciated.
 David

Re: How do you write Dataframes to elasticsearch

2015-03-25 Thread Nick Pentreath
Spark 1.3 is not supported by elasticsearch-hadoop yet but will be very soon: 
https://github.com/elastic/elasticsearch-hadoop/issues/400





However in the meantime you could use df.toRDD.saveToEs - though you may have 
to manipulate the Row object perhaps to extract fields, not sure if it will 
serialize directly to ES JSON...



—
Sent from Mailbox

On Wed, Mar 25, 2015 at 2:07 PM, yamanoj manoj.per...@gmail.com wrote:

 It seems that elasticsearch-spark_2.10 currently not supporting spart 1.3.
 Could you tell me if there is an alternative way to save Dataframes to
 elasticsearch? 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-write-Dataframes-to-elasticsearch-tp3.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems

2015-03-25 Thread Nick Pentreath
What version of Spark do the other dependencies rely on (Adam and H2O?) - that 
could be it




Or try sbt clean compile 



—
Sent from Mailbox

On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote:

 I have a EC2 cluster created using spark version 1.2.1.
 And I have a SBT project .
 Now I want to upgrade to spark 1.3 and use the new features.
 Below are issues .
 Sorry for the long post.
 Appreciate your help.
 Thanks
 -Roni
 Question - Do I have to create a new cluster using spark 1.3?
 Here is what I did -
 In my SBT file I  changed to -
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0
 But then I started getting compilation error. along with
 Here are some of the libraries that were evicted:
 [warn]  * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0
 [warn]  * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0
 [warn] Run 'evicted' to see detailed eviction warnings
  constructor cannot be instantiated to expected type;
 [error]  found   : (T1, T2)
 [error]  required: org.apache.spark.sql.catalyst.expressions.Row
 [error] val ty = joinRDD.map{case(word,
 (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)}
 [error]  ^
 Here is my SBT and code --
 SBT -
 version := 1.0
 scalaVersion := 2.10.4
 resolvers += Sonatype OSS Snapshots at 
 https://oss.sonatype.org/content/repositories/snapshots;;
 resolvers += Maven Repo1 at https://repo1.maven.org/maven2;;
 resolvers += Maven Repo at 
 https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;;
 /* Dependencies - %% appends Scala version to artifactId */
 libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0
 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0
 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10
 CODE --
 import org.apache.spark.{SparkConf, SparkContext}
 case class KmerIntesect(kmer: String, kCount: Int, fileName: String)
 object preDefKmerIntersection {
   def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName(preDefKmer-intersect)
  val sc = new SparkContext(sparkConf)
 import sqlContext.createSchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val bedFile = sc.textFile(s3n://a/b/c,40)
  val hgfasta = sc.textFile(hdfs://a/b/c,40)
  val hgPair = hgfasta.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val filtered = hgPair.filter(kv = kv._2 == 1)
  val bedPair = bedFile.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val joinRDD = bedPair.join(filtered)
 val ty = joinRDD.map{case(word, (file1Counts, file2Counts))
 = KmerIntesect(word, file1Counts,xyz)}
 ty.registerTempTable(KmerIntesect)
 ty.saveAsParquetFile(hdfs://x/y/z/kmerIntersect.parquet)
   }
 }

Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems

2015-03-25 Thread Nick Pentreath
Ah I see now you are trying to use a spark 1.2 cluster - you will need to be 
running spark 1.3 on your EC2 cluster in order to run programs built against 
spark 1.3.




You will need to terminate and restart your cluster with spark 1.3 



—
Sent from Mailbox

On Wed, Mar 25, 2015 at 6:39 PM, roni roni.epi...@gmail.com wrote:

 Even if H2o and ADA are dependent on 1.2.1 , it should be backword
 compatible, right?
 So using 1.3 should not break them.
 And the code is not using the classes from those libs.
 I tried sbt clean compile .. same errror
 Thanks
 _R
 On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com
 wrote:
 What version of Spark do the other dependencies rely on (Adam and H2O?) -
 that could be it

 Or try sbt clean compile

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote:

 I have a EC2 cluster created using spark version 1.2.1.
 And I have a SBT project .
 Now I want to upgrade to spark 1.3 and use the new features.
 Below are issues .
 Sorry for the long post.
 Appreciate your help.
 Thanks
 -Roni

 Question - Do I have to create a new cluster using spark 1.3?

 Here is what I did -

 In my SBT file I  changed to -
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0

 But then I started getting compilation error. along with
 Here are some of the libraries that were evicted:
 [warn]  * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0
 [warn]  * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0
 [warn] Run 'evicted' to see detailed eviction warnings

  constructor cannot be instantiated to expected type;
 [error]  found   : (T1, T2)
 [error]  required: org.apache.spark.sql.catalyst.expressions.Row
 [error] val ty = joinRDD.map{case(word,
 (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)}
 [error]  ^

 Here is my SBT and code --
 SBT -

 version := 1.0

 scalaVersion := 2.10.4

 resolvers += Sonatype OSS Snapshots at 
 https://oss.sonatype.org/content/repositories/snapshots;;
 resolvers += Maven Repo1 at https://repo1.maven.org/maven2;;
 resolvers += Maven Repo at 
 https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;;

 /* Dependencies - %% appends Scala version to artifactId */
 libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0
 libraryDependencies += org.apache.spark %% spark-core % 1.3.0
 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0
 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10


 CODE --
 import org.apache.spark.{SparkConf, SparkContext}
 case class KmerIntesect(kmer: String, kCount: Int, fileName: String)

 object preDefKmerIntersection {
   def main(args: Array[String]) {

  val sparkConf = new SparkConf().setAppName(preDefKmer-intersect)
  val sc = new SparkContext(sparkConf)
 import sqlContext.createSchemaRDD
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val bedFile = sc.textFile(s3n://a/b/c,40)
  val hgfasta = sc.textFile(hdfs://a/b/c,40)
  val hgPair = hgfasta.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val filtered = hgPair.filter(kv = kv._2 == 1)
  val bedPair = bedFile.map(_.split (,)).map(a= (a(0),
 a(1).trim().toInt))
  val joinRDD = bedPair.join(filtered)
 val ty = joinRDD.map{case(word, (file1Counts,
 file2Counts)) = KmerIntesect(word, file1Counts,xyz)}
 ty.registerTempTable(KmerIntesect)
 ty.saveAsParquetFile(hdfs://x/y/z/kmerIntersect.parquet)
   }
 }




Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Nick Pentreath
I'm guessing the Accumulo Key and Value classes are not serializable, so you 
would need to do something like 




val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) = 
(extractScalaType(key), extractScalaType(value)) }




Where 'extractScalaType converts the key or Value to a standard Scala type or 
case class or whatever - basically extracts the data from the Key or Value in a 
form usable in Scala 



—
Sent from Mailbox

On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com
wrote:

 Hi, David,
 This is the code that I use to create a JavaPairRDD from an Accumulo table:
 JavaSparkContext sc = new JavaSparkContext(conf);
 Job hadoopJob = Job.getInstance(conf,TestSparkJob);
 job.setInputFormatClass(AccumuloInputFormat.class);
 AccumuloInputFormat.setZooKeeperInstance(job,
 conf.get(ZOOKEEPER_INSTANCE_NAME,
 conf.get(ZOOKEEPER_HOSTS)
 );
 AccumuloInputFormat.setConnectorInfo(job,
 conf.get(ACCUMULO_AGILE_USERNAME),
 new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
 );
 AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
 AccumuloInputFormat.setScanAuthorizations(job, auths);
 JavaPairRDDKey, Value values =
 sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
 Key.class, Value.class);
 Key.class and Value.class are from org.apache.accumulo.core.data. I use a
 WholeRowIterator so that the Value is actually an encoded representation of
 an entire logical row; it's a useful convenience if you can be sure that
 your rows always fit in memory.
 I haven't tested it since Spark 1.0.1 but I doubt anything important has
 changed.
 Regards,
 -Russ
 On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com
 wrote:
  * progress!*

 i was able to figure out why the 'input INFO not set' error was occurring.
 the eagle-eyed among you will no doubt see the following code is missing a
 closing '('

 AbstractInputFormat.setConnectorInfo(jobConf, root, new 
 PasswordToken(password)

 as I'm doing this in spark-notebook, I'd been clicking the execute button
 and moving on because I wasn't seeing an error. what I forgot was that
 notebook is going to do what spark-shell will do when you leave off a
 closing ')' -- *it will wait forever for you to add it*. so the error was
 the result of the 'setConnectorInfo' method never getting executed.

 unfortunately, I'm still unable to shove the accumulo table data into an
 RDD that's useable to me. when I execute

 rddX.count

 I get back

 res15: Long = 1

 which is the correct response - there are 10,000 rows of data in the table
 I pointed to. however, when I try to grab the first element of data thusly:

 rddX.first

 I get the following error:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0 in stage 0.0 (TID 0) had a not serializable result:
 org.apache.accumulo.core.data.Key

 any thoughts on where to go from here?
  DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com



 www.AnnaiSystems.com

  On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com
 wrote:

  hi Nick

  Unfortunately the Accumulo docs are woefully inadequate, and in some
 places, flat wrong. I'm not sure if this is a case where the docs are 'flat
 wrong', or if there's some wrinke with spark-notebook in the mix that's
 messing everything up. I've been working with some people on stack overflow
 on this same issue (including one of the people from the spark-notebook
 team):


 http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530

  if you click the link you can see the entire thread of code, responses
 from notebook, etc. I'm going to try invoking the same techniques both from
 within a stand-alone scala problem and from the shell itself to see if I
 can get some traction. I'll report back when I have more data.

  cheers (and thx!)



 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com


 GetFileAttachment.jpg
 www.AnnaiSystems.com http://www.annaisystems.com/

  On Mar 25, 2015, at 11:43 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

  From a quick look at this link -
 http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it
 seems you need to call some static methods on AccumuloInputFormat in order
 to set the auth, table, and range settings. Try setting these config
 options first and then call newAPIHadoopRDD?

 On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.com
 wrote:

 hi Irfan,

  thanks for getting back to me - i'll try the accumulo list to be sure.
 what is the normal use case for spark though? I'm surprised that hooking it
 into something as common and popular as accumulo isn't more of an every-day
 task.

 DAVID HOLIDAY
  Software Engineer
  760 607 3300

Re: hadoop input/output format advanced control

2015-03-24 Thread Nick Pentreath
You can indeed override the Hadoop configuration at a per-RDD level -
though it is a little more verbose, as in the below example, and you need
to effectively make a copy of the hadoop Configuration:

val thisRDDConf = new Configuration(sc.hadoopConfiguration)
thisRDDConf.set(mapred.min.split.size, 5)
val rdd = sc.newAPIHadoopFile(path,
  classOf[SequenceFileInputFormat[IntWritable, Text]],
  classOf[IntWritable],
  classOf[Text],
  thisRDDConf
)
println(rdd.partitions.size)

val rdd2 = sc.newAPIHadoopFile(path,
  classOf[SequenceFileInputFormat[IntWritable, Text]],
  classOf[IntWritable],
  classOf[Text]
)
println(rdd2.partitions.size)


For example, if I run the above on the following directory (some files I
have lying around):

-rw-r--r--  1 Nick  staff 0B Jul 11  2014 _SUCCESS
-rw-r--r--  1 Nick  staff   291M Sep 16  2014 part-0
-rw-r--r--  1 Nick  staff   227M Sep 16  2014 part-1
-rw-r--r--  1 Nick  staff   370M Sep 16  2014 part-2
-rw-r--r--  1 Nick  staff   244M Sep 16  2014 part-3
-rw-r--r--  1 Nick  staff   240M Sep 16  2014 part-4

I get output:

15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5
*5*

... and then for the second RDD:

15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from
newAPIHadoopFile at TestHash.scala:41
*45*

As expected.

Though a more succinct way of passing in those conf options would be nice -
but this should get you what you need.



On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers ko...@tresata.com wrote:

 currently its pretty hard to control the Hadoop Input/Output formats used
 in Spark. The conventions seems to be to add extra parameters to all
 methods and then somewhere deep inside the code (for example in
 PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
 settings on the Hadoop Configuration object.

 for example for compression i see codec: Option[Class[_ :
 CompressionCodec]] = None added to a bunch of methods.

 how scalable is this solution really?

 for example i need to read from a hadoop dataset and i dont want the input
 (part) files to get split up. the way to do this is to set
 mapred.min.split.size. now i dont want to set this at the level of the
 SparkContext (which can be done), since i dont want it to apply to input
 formats in general. i want it to apply to just this one specific input
 dataset i need to read. which leaves me with no options currently. i could
 go add yet another input parameter to all the methods
 (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
 etc.). but that seems ineffective.

 why can we not expose a Map[String, String] or some other generic way to
 manipulate settings for hadoop input/output formats? it would require
 adding one more parameter to all methods to deal with hadoop input/output
 formats, but after that its done. one parameter to rule them all

 then i could do:
 val x = sc.textFile(/some/path, formatSettings =
 Map(mapred.min.split.size - 12345))

 or
 rdd.saveAsTextFile(/some/path, formatSettings =
 Map(mapred.output.compress - true, mapred.output.compression.codec -
 somecodec))



Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Nick Pentreath
From a quick look at this link -
http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it
seems you need to call some static methods on AccumuloInputFormat in order
to set the auth, table, and range settings. Try setting these config
options first and then call newAPIHadoopRDD?

On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.com
wrote:

  hi Irfan,

  thanks for getting back to me - i'll try the accumulo list to be sure.
 what is the normal use case for spark though? I'm surprised that hooking it
 into something as common and popular as accumulo isn't more of an every-day
 task.

 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com



 www.AnnaiSystems.com

  On Mar 25, 2015, at 5:27 PM, Irfan Ahmad ir...@cloudphysics.com wrote:

  Hmmm this seems very accumulo-specific, doesn't it? Not sure how to
 help with that.


  *Irfan Ahmad*
 CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com/
 Best of VMworld Finalist
  Best Cloud Management Award
  NetworkWorld 10 Startups to Watch
 EMA Most Notable Vendor

 On Tue, Mar 24, 2015 at 4:09 PM, David Holiday dav...@annaisystems.com
 wrote:

 hi all,

  got a vagrant image with spark notebook, spark, accumulo, and hadoop
 all running. from notebook I can manually create a scanner and pull test
 data from a table I created using one of the accumulo examples:

 val instanceNameS = accumuloval zooServersS = localhost:2181val 
 instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)val 
 connector: Connector = instance.getConnector( root, new 
 PasswordToken(password))val auths = new Authorizations(exampleVis)val 
 scanner = connector.createScanner(batchtest1, auths)

 scanner.setRange(new Range(row_00, row_10))
 for(entry: Entry[Key, Value] - scanner) {
   println(entry.getKey +  is  + entry.getValue)}

 will give the first ten rows of table data. when I try to create the RDD
 thusly:

 val rdd2 =
   sparkContext.newAPIHadoopRDD (
 new Configuration(),
 classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
 classOf[org.apache.accumulo.core.data.Key],
 classOf[org.apache.accumulo.core.data.Value]
   )

 I get an RDD returned to me that I can't do much with due to the
 following error:

 java.io.IOException: Input info has not been set. at
 org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630)
 at
 org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343)
 at
 org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at
 org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at
 org.apache.spark.rdd.RDD.count(RDD.scala:927)

 which totally makes sense in light of the fact that I haven't specified
 any parameters as to which table to connect with, what the auths are, etc.

 so my question is: what do I need to do from here to get those first ten
 rows of table data into my RDD?



  DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com


 GetFileAttachment.jpg
 www.AnnaiSystems.com http://www.annaisystems.com/

   On Mar 19, 2015, at 11:25 AM, David Holiday dav...@annaisystems.com
 wrote:

  kk - I'll put something together and get back to you with more :-)

 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com


 GetFileAttachment.jpg
 www.AnnaiSystems.com http://www.annaisystems.com/

  On Mar 19, 2015, at 10:59 AM, Irfan Ahmad ir...@cloudphysics.com
 wrote:

  Once you setup spark-notebook, it'll handle the submits for interactive
 work. Non-interactive is not handled by it. For that spark-kernel could be
 used.

  Give it a shot ... it only takes 5 minutes to get it running in
 local-mode.


  *Irfan Ahmad*
 CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com/
 Best of VMworld Finalist
  Best Cloud Management Award
  NetworkWorld 10 Startups to Watch
 EMA Most Notable Vendor

 On Thu, Mar 19, 2015 at 9:51 AM, David Holiday dav...@annaisystems.com
 wrote:

 hi all - thx for the alacritous replies! so regarding how to get things
 from notebook to spark and back, am I correct that spark-submit is the way
 to go?

 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com


 GetFileAttachment.jpg
 www.AnnaiSystems.com http://www.annaisystems.com/

  On Mar 19, 2015, at 

Re: StackOverflow Problem with 1.3 mllib ALS

2015-04-02 Thread Nick Pentreath
Fair enough but I'd say you hit that diminishing return after 20 iterations
or so... :)

On Thu, Apr 2, 2015 at 9:39 AM, Justin Yip yipjus...@gmail.com wrote:

 Thanks Xiangrui,

 I used 80 iterations to demonstrates the marginal diminishing return in
 prediction quality :)

 Justin
 On Apr 2, 2015 00:16, Xiangrui Meng men...@gmail.com wrote:

 I think before 1.3 you also get stackoverflow problem in  ~35
 iterations. In 1.3.x, please use setCheckpointInterval to solve this
 problem, which is available in the current master and 1.3.1 (to be
 released soon). Btw, do you find 80 iterations are needed for
 convergence? -Xiangrui

 On Wed, Apr 1, 2015 at 11:54 PM, Justin Yip yipjus...@prediction.io
 wrote:
  Hello,
 
  I have been using Mllib's ALS in 1.2 and it works quite well. I have
 just
  upgraded to 1.3 and I encountered stackoverflow problem.
 
  After some digging, I realized that when the iteration  ~35, I will get
  overflow problem. However, I can get at least 80 iterations with ALS in
 1.2.
 
  Is there any change to the ALS algorithm? And are there any ways to
 achieve
  more iterations?
 
  Thanks.
 
  Justin

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RE: ElasticSearch for Spark times out

2015-04-22 Thread Nick Pentreath
Is your ES cluster reachable from your Spark cluster via network / firewall? 
Can you run the same query from the spark master and slave nodes via curl / one 
of the other clients?




Seems odd that GC issues would be a problem from the scan but not when running 
query from a browser plugin... Sounds like it could be a network issue.



—
Sent from Mailbox

On Thu, Apr 23, 2015 at 5:11 AM, Otis Gospodnetic
otis.gospodne...@gmail.com wrote:

 Hi,
 If you get ES response back in 1-5 seconds that's pretty slow.  Are these
 ES aggregation queries?  Costin may be right about GC possibly causing
 timeouts.  SPM http://sematext.com/spm/ can give you all Spark and all
 key Elasticsearch metrics, including various JVM metrics.  If the problem
 is GC, you'll see it.  If you monitor both Spark side and ES side, you
 should be able to find some correlation with SPM.
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/
 On Wed, Apr 22, 2015 at 5:43 PM, Costin Leau costin.l...@gmail.com wrote:
 Hi,

 First off, for Elasticsearch questions is worth pinging the Elastic
 mailing list as that is closer monitored than this one.

 Back to your question, Jeetendra is right that the exception indicates
 nodata is flowing back to the es-connector and
 Spark.
 The default is 1m [1] which should be more than enough for a typical
 scenario. As a side note the scroll size is 50 per
 tasks
 (so 150 suggests 3 tasks).

 Once the query is made, scrolling the document is fast - likely there's
 something else at hand that causes the
 connection to timeout.
 In such cases, you can enable logging on the REST package and see what
 type of data transfer occurs between ES and Spark.

 Do note that if a GC occurs, that can freeze Elastic (or Spark) which
 might trigger the timeout. Consider monitoring
 Elasticsearch during
 the query and see whether anything jumps - in particular the memory
 pressure.

 Hope this helps,

 [1]
 http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network

 On 4/22/15 10:44 PM, Adrian Mocanu wrote:

 Hi

 Thanks for the help. My ES is up.

 Out of curiosity, do you know what the timeout value is? There are
 probably other things happening to cause the timeout;
 I don’t think my ES is that slow but it’s possible that ES is taking too
 long to find the data. What I see happening is
 that it uses scroll to get the data from ES; about 150 items at a
 time.Usual delay when I perform the same query from a
 browser plugin ranges from 1-5sec.

 Thanks

 *From:*Jeetendra Gangele [mailto:gangele...@gmail.com]
 *Sent:* April 22, 2015 3:09 PM
 *To:* Adrian Mocanu
 *Cc:* u...@spark.incubator.apache.org
 *Subject:* Re: ElasticSearch for Spark times out

 Basically ready timeout means hat no data arrived within the specified
 receive timeout period.

 Few thing I would suggest

 1.are your ES cluster Up and running?

 2. if 1 is yes then reduce the size of the Index make it few kbps and
 then test?

 On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com
 mailto:amoc...@verticalscope.com wrote:

 Hi

 I use the ElasticSearch package for Spark and very often it times out
 reading data from ES into an RDD.

 How can I keep the connection alive (why doesn’t it? Bug?)

 Here’s the exception I get:

 org.elasticsearch.hadoop.serialization.EsHadoopSerializationException:
 java.net.SocketTimeoutException: Read timed out

  at
 org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
 ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3]

  at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 

Re: MLlib -Collaborative Filtering

2015-04-20 Thread Nick Pentreath
You will have to get the two user factor vectors from the ALS model and
compute the cosine similarity between them. You can do this using Breeze
vectors:

import breeze.linalg._
val user1 = new DenseVector[Double](userFactors.lookup(user1).head)
val user2 = new DenseVector[Double](userFactors.lookup(user2).head)
val sim = user1.t * user2 / (norm(user1)* norm(user2))


There is no built-in way currently to compute user or item similarities,
though there is a PR working on it:
https://github.com/apache/spark/pull/3536



On Sun, Apr 19, 2015 at 7:29 PM, Christian S. Perone 
christian.per...@gmail.com wrote:

 The easiest way to do that is to use a similarity metric between the
 different user factors.

 On Sat, Apr 18, 2015 at 7:49 AM, riginos samarasrigi...@gmail.com wrote:

 Is there any way that i can see the similarity table of 2 users in that
 algorithm? by that i mean the similarity between 2 users



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Collaborative-Filtering-tp22553.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Blog http://blog.christianperone.com | Github
 https://github.com/perone | Twitter https://twitter.com/tarantulae
 Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
 joke on me.



Re: solr in spark

2015-04-28 Thread Nick Pentreath
I haven't used Solr for a long time, and haven't used Solr in Spark.

However, why do you say Elasticsearch is not a good option ...? ES
absolutely supports full-text search and not just filtering and grouping
(in fact it's original purpose was and still is text search, though
filtering, grouping and aggregation are heavily used).
http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Does anyone tried using solr inside spark?
 below is the project describing it.
 https://github.com/LucidWorks/spark-solr.

 I have a requirement in which I want to index 20 millions companies name
 and then search as and when new data comes in. the output should be list of
 companies matching the query.

 Spark has inbuilt elastic search but for this purpose Elastic search is
 not a good option since this is totally text search problem?

 Elastic search is good  for filtering and grouping.

 Does any body used solr inside spark?

 Regards
 jeetendra




Re: solr in spark

2015-04-28 Thread Nick Pentreath
Depends on your use case and search volume. Typically you'd have a dedicated ES 
cluster if your app is doing a lot of real time indexing and search.




If it's only for spark integration then you could colocate ES and spark



—
Sent from Mailbox

On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Thanks for reply.
 Elastic search index will be within my Cluster? or I need the separate host
 the elastic search?
 On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com wrote:
 I haven't used Solr for a long time, and haven't used Solr in Spark.

 However, why do you say Elasticsearch is not a good option ...? ES
 absolutely supports full-text search and not just filtering and grouping
 (in fact it's original purpose was and still is text search, though
 filtering, grouping and aggregation are heavily used).
 http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html



 On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Does anyone tried using solr inside spark?
 below is the project describing it.
 https://github.com/LucidWorks/spark-solr.

 I have a requirement in which I want to index 20 millions companies name
 and then search as and when new data comes in. the output should be list of
 companies matching the query.

 Spark has inbuilt elastic search but for this purpose Elastic search is
 not a good option since this is totally text search problem?

 Elastic search is good  for filtering and grouping.

 Does any body used solr inside spark?

 Regards
 jeetendra




Re: Content based filtering

2015-05-12 Thread Nick Pentreath
Content based filtering is a pretty broad term - do you have any particular 
approach in mind?




MLLib does not have any purely content-based methods. Your main alternative is 
ALS collaborative filtering.




However, using a system like Oryx / PredictionIO / elasticsearch etc you can 
combine factor-based collaborative filtering with content-based pre- and 
post-filtering steps (eg filter recommendations by geolocation, price, category 
and so on). 



—
Sent from Mailbox

On Tue, May 12, 2015 at 1:45 PM, Yasemin Kaya godo...@gmail.com wrote:

 Hi, is Content based filtering available for Spark in Mllib? If it isn't ,
 what can I use as an alternative? Thank you.
 Have a nice day
 yasemin
 -- 
 hiç ender hiç

Re: Passing Elastic Search Mappings in Spark Conf

2015-04-15 Thread Nick Pentreath
If you want to specify mapping you must first create the mappings for your 
index types before indexing.




As far as I know there is no way to specify this via ES-hadoop. But it's best 
practice to explicitly create mappings prior to indexing, or to use index 
templates when dynamically creating indexes.



—
Sent from Mailbox

On Thu, Apr 16, 2015 at 1:14 AM, Deepak Subhramanian
deepak.subhraman...@gmail.com wrote:

 Hi,
 Is there a way to pass the mapping to define a field as not analyzed
 with es-spark settings.
 I am just wondering if I can set the mapping type for a field as not
 analyzed using the set function in spark conf as similar to the other
 es settings.
 val sconf = new SparkConf()
   .setMaster(local[1])
   .setAppName(Load Data To ES)
   .set(spark.ui.port, 4141)
   .set(es.index.auto.create, true)
   .set(es.net.http.auth.user, es_admin)
   .set(es.index.auto.create, true)
   .set(es.mapping.names, CREATED_DATE:@timestamp)
 Thanks,
 Deepak Subhramanian
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: When querying ElasticSearch, score is 0

2015-04-18 Thread Nick Pentreath
ES-hadoop uses a scan  scroll search to efficiently retrieve large result 
sets. Scores are not tracked in a scan and sorting is not supported hence 0 
scores.





http://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-scan










—
Sent from Mailbox

On Thu, Apr 16, 2015 at 10:46 PM, Andrejs Abele
andrejs.ab...@insight-centre.org wrote:

 Hi,
 I have data in my ElasticSearch server, when I query it using rest
 interface, I get results and score for each result, but when I run the
 same query in spark using ElasticSearch API,  I get results and meta
 data, but the score is shown 0 for each record.
 My configuration is
 ...
 val conf = new SparkConf()
   .setMaster(local[6])
   .setAppName(DBpedia to ElasticSearch)
   .set(es.index.auto.create, true)
   .set(es.field.read.empty.as.null,true)
   .set(es.read.metadata,true)
 ...
 val sc = new SparkContext(conf) 
 val test= Map(query-{\n\query\:{\n \fuzzy_like_this\ : {\n \fields\ 
 : [\label\],\n \like_text\ : \102nd Ohio Infantry\ }\n  } \n})
 val mYRDD = sc.esRDD(dbpedia/docs,test.get(query).get)
 Sample output:
 Map(id - http://dbpedia.org/resource/Alert,_Ohio;, label - Alert, Ohio, 
 category - Unincorporated communities in Ohio, abstract - Alert is an 
 unincorporated community in southern Morgan Township, Butler County, Ohio, in 
 the United States. It is located about ten miles southwest of Hamilton on 
 Howards Creek, a tributary of the Great Miami River in section 28 of R1ET3N 
 of the Congress Lands. It is three miles west of Shandon and two miles south 
 of Okeana., _metadata - Map(_index - dbpedia, _type - docs, _id - 
 AUy5aQs7895C6HE5GmG4, _score - 0.0))
 As you can see _score is 0.
 Would appreciate any help,
 Cheers,
 Andrejs 

Re: MLlib -Collaborative Filtering

2015-04-18 Thread Nick Pentreath
What do you mean by similarity table of 2 users?




Do you mean the similarity between 2 users?



—
Sent from Mailbox

On Sat, Apr 18, 2015 at 11:09 AM, riginos samarasrigi...@gmail.com
wrote:

 Is there any way that i can see the similarity table of 2 users in that
 algorithm? 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Collaborative-Filtering-tp22552.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: Difference between textFile Vs hadoopFile (textInoutFormat) on HDFS data

2015-04-07 Thread Nick Pentreath
There is no difference - textFile calls hadoopFile with a TextInputFormat, and 
maps each value to a String. 



—
Sent from Mailbox

On Tue, Apr 7, 2015 at 1:46 PM, Puneet Kumar Ojha
puneet.ku...@pubmatic.com wrote:

 Hi ,
 Is there any difference between Difference between textFile Vs hadoopFile 
 (textInoutFormat) when data is present in HDFS? Will there be any performance 
 gain that can be observed?
 Puneet Kumar Ojha
 Data Architect | PubMatichttp://www.pubmatic.com/

Re: Migrating from Spark 0.8.0 to Spark 1.3.0

2015-04-04 Thread Nick Pentreath
It shouldn't be too bad - pertinent changes  migration notes are here: 
http://spark.apache.org/docs/1.0.0/programming-guide.html#migrating-from-pre-10-versions-of-spark
 for pre-1.0 and here: 
http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13
 for SparkSQL pre-1.3




Since you aren't using SparkSQL the 2nd link is probably not useful. 




Generally you should find very few changes in the core API but things like 
MLlib would have changed a fair bit - though again the API should have been 
relatively stable.




Your biggest change is probably going to be running jobs through spark-submit 
rather than spark-class etc: 
http://spark.apache.org/docs/latest/submitting-applications.html










—
Sent from Mailbox

On Sat, Apr 4, 2015 at 1:11 AM, Ritesh Kumar Singh
riteshoneinamill...@gmail.com wrote:

 Hi,
 Are there any tutorials that explains all the changelogs between Spark
 0.8.0 and Spark 1.3.0 and how can we approach this issue.

Re: Spark and Google Cloud Storage

2015-06-18 Thread Nick Pentreath
I believe it is available here:
https://cloud.google.com/hadoop/google-cloud-storage-connector

2015-06-18 15:31 GMT+02:00 Klaus Schaefers klaus.schaef...@ligatus.com:

 Hi,

 is there a kind adapter to use GoogleCloudStorage with Spark?


 Cheers,

 Klaus

 --

 --

 Klaus Schaefers
 Senior Optimization Manager

 Ligatus GmbH
 Hohenstaufenring 30-32
 D-50674 Köln

 Tel.:  +49 (0) 221 / 56939 -784
 Fax:  +49 (0) 221 / 56 939 - 599
 E-Mail: klaus.schaef...@ligatus.com
 Web: www.ligatus.de

 HRB Köln 56003
 Geschäftsführung:
 Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
 Dipl.-Wirtschaftsingenieur Arne Wolter



Re: Spark Titan

2015-06-21 Thread Nick Pentreath
 Something like this works (or at least worked with titan 0.4 back when I
 was using it):


 val graph = sc.newAPIHadoopRDD(
 configuration,
 fClass = classOf[TitanHBaseInputFormat],
 kClass = classOf[NullWritable],
 vClass = classOf[FaunusVertex])
 graph.flatMap { vertex =
 val edges = vertex.getEdges(Direction.OUT).filter(e = e.getLabel ==
 ...)
 edges.map { edge = (...) }
 }

 Note that FaunusVertex is not Serializable so you'll need to extract the
 properties (or say a JSON representation) of your vertices in the first map
 or flatMap operation (or extract your edges and properties).


 On Sun, Jun 21, 2015 at 6:57 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Have a look at
 http://s3.thinkaurelius.com/docs/titan/0.5.0/titan-io-format.html You
 could use those Input/Output formats with newAPIHadoopRDD api call.

 Thanks
 Best Regards

 On Sun, Jun 21, 2015 at 8:50 PM, Madabhattula Rajesh Kumar 
 mrajaf...@gmail.com wrote:

 Hi,

 How to connect TItan database from Spark? Any out of the box api's
 available?

 Regards,
 Rajesh






Re: Velox Model Server

2015-06-24 Thread Nick Pentreath
Ok

My view is with only 100k items, you are better off serving in-memory for
items vectors. i.e. store all item vectors in memory, and compute user *
item score on-demand. In most applications only a small proportion of users
are active, so really you don't need all 10m user vectors in memory. They
could be looked up from a K-V store and have an LRU cache in memory for say
1m of those. Optionally also update them as feedback comes in.

As far as I can see, this is pretty much what velox does except it
partitions all user vectors across nodes to scale.

Oryx does almost the same but Oryx1 kept all user and item vectors in
memory (though I am not sure about whether Oryx2 still stores all user and
item vectors in memory or partitions in some way).

Deb, we are using a custom Akka-based model server (with Scalatra
frontend). It is more focused on many small models in-memory (largest of
these is around 5m user vectors, 100k item vectors, with factor size
20-50). We use Akka cluster sharding to allow scale-out across nodes if
required. We have a few hundred models comfortably powered by m3.xlarge AWS
instances. Using floats you could probably have all of your factors in
memory on one 64GB machine (depending on how many models you have).

Our solution is not that generic and a little hacked-together - but I'd be
happy to chat offline about sharing what we've done. I think it still has a
basic client to the Spark JobServer which would allow triggering
re-computation jobs periodically. We currently just run batch
re-computation and reload factors from S3 periodically.

We then use Elasticsearch to post-filter results and blend content-based
stuff - which I think might be more efficient than SparkSQL for this
particular purpose.

On Wed, Jun 24, 2015 at 8:59 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Model sizes are 10m x rank, 100k x rank range.

 For recommendation/topic modeling I can run batch recommendAll and then
 keep serving the model using a distributed cache but then I can't
 incorporate per user model re-predict if user feedback is making the
 current topk stale. I have to wait for next batch refresh which might be 1
 hr away.

 spark job server + spark sql can get me fresh updates but each time
 running a predict might be slow.

 I am guessing the better idea might be to start with batch recommendAll
 and then update the per user model if it get stale but that needs acess to
 the key value store and the model over a API like spark job server. I am
 running experiments with job server. In general it will be nice if my key
 value store and model are both managed by same akka based API.

 Yes sparksql is to filter/boost recommendation results using business
 logic like user demography for example..
 On Jun 23, 2015 2:07 AM, Sean Owen so...@cloudera.com wrote:

 Yes, and typically needs are 100ms. Now imagine even 10 concurrent
 requests. My experience has been that this approach won't nearly
 scale. The best you could probably do is async mini-batch
 near-real-time scoring, pushing results to some store for retrieval,
 which could be entirely suitable for your use case.

 On Tue, Jun 23, 2015 at 8:52 AM, Nick Pentreath
 nick.pentre...@gmail.com wrote:
  If your recommendation needs are real-time (1s) I am not sure job
 server
  and computing the refs with spark will do the trick (though those new
  BLAS-based methods may have given sufficient speed up).




Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Nick Pentreath
Yup, numpy calls into BLAS for matrix multiply.

Sent from my iPad

 On 18 Jun 2015, at 8:54 PM, Ayman Farahat ayman.fara...@yahoo.com wrote:
 
 Thanks all for the help. 
 It turned out that using the bumpy matrix multiplication made a huge 
 difference in performance. I suspect that Numpy already uses BLAS optimized 
 code. 
 
 Here is Python code
 
 #This is where i load and directly test the predictions
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 m1 = myModel.productFeatures().sample(False, 1.00)
 m2 = m1.map(lambda (user,feature) : feature).collect()
 m3 = matrix(m2).transpose()
 
 pf = sc.broadcast(m3)
 uf = myModel.userFeatures()
 
 f1 = uf.map(lambda (userID, features): (userID, 
 squeeze(asarray(matrix(array(features)) * pf.value
 dog = f1.count()
 
 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote:
 
 Also in my experiments, it's much faster to blocked BLAS through cartesian 
 rather than doing sc.union. Here are the details on the experiments:
 
 https://issues.apache.org/jira/browse/SPARK-4823
 
 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Also not sure how threading helps here because Spark puts a partition to 
 each core. On each core may be there are multiple threads if you are using 
 intel hyperthreading but I will let Spark handle the threading.  
 
 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS 
 dgemm based calculation.
 
 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:
 
 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share. 
 Best
 Ayman
 
 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:
 
 Nick is right. I too have implemented this way and it works just fine. 
 In my case, there can be even more products. You simply broadcast blocks 
 of products to userFeatures.mapPartitions() and BLAS multiply in there 
 to get recommendations. In my case 10K products form one block. Note 
 that you would then have to union your recommendations. And if there 
 lots of product blocks, you might also want to checkpoint once every few 
 times.
 
 Regards
 Sab
 
 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:
 One issue is that you broadcast the product vectors and then do a dot 
 product one-by-one with the user vector.
 
 You should try forming a matrix of the item vectors and doing the dot 
 product as a matrix-vector multiply which will make things a lot faster.
 
 Another optimisation that is avalailable on 1.4 is a recommendProducts 
 method that blockifies the factors to make use of level 3 BLAS (ie 
 matrix-matrix multiply). I am not sure if this is available in The 
 Python api yet. 
 
 But you can do a version yourself by using mapPartitions over user 
 factors, blocking the factors into sub-matrices and doing matrix 
 multiply with item factor matrix to get scores on a block-by-block 
 basis.
 
 Also as Ilya says more parallelism can help. I don't think it's so 
 necessary to do LSH with 30,000 items.
 
 —
 Sent from Mailbox
 
 
 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:
 Actually talk about this exact thing in a blog post here 
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
  Keep in mind, you're actually doing a ton of math. Even with proper 
 caching and use of broadcast variables this will take a while 
 defending on the size of your cluster. To get real results you may 
 want to look into locality sensitive hashing to limit your search 
 space and definitely look into spinning up multiple threads to process 
 your product features in parallel to increase resource utilization on 
 the cluster.
 
 
 
 Thank you,
 Ilya Ganelin
 
 
 
 -Original Message-
 From: afarahat [ayman.fara...@yahoo.com]
 Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Matrix Multiplication and mllib.recommendation
 
 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 
 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication 
 where
 I first get the product features, broadcast them and then do a dot 
 product.
 Its still very slow. Any reason why
 here is a sample code
 
 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a
 
 
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1

Re: Velox Model Server

2015-06-22 Thread Nick Pentreath
How large are your models?




Spark job server does allow synchronous job execution and with a warm 
long-lived context it will be quite fast - but still in the order of a second 
or a few seconds usually (depending on model size - for very large models 
possibly quite a lot more than that).




What are your use cases for SQL during recommendation? Filtering?





If your recommendation needs are real-time (1s) I am not sure job server and 
computing the refs with spark will do the trick (though those new BLAS-based 
methods may have given sufficient speed up).



—
Sent from Mailbox

On Mon, Jun 22, 2015 at 11:17 PM, Debasish Das debasish.da...@gmail.com
wrote:

 Models that I am looking for are mostly factorization based models (which
 includes both recommendation and topic modeling use-cases).
 For recommendation models, I need a combination of Spark SQL and ml model
 prediction api...I think spark job server is what I am looking for and it
 has fast http rest backend through spray which will scale fine through akka.
 Out of curiosity why netty?
 What model are you serving?
 Velox doesn't look like it is optimized for cases like ALS recs, if that's
 what you mean. I think scoring ALS at scale in real time takes a fairly
 different approach.
 The servlet engine probably doesn't matter at all in comparison.
 On Sat, Jun 20, 2015, 9:40 PM Debasish Das debasish.da...@gmail.com wrote:
 After getting used to Scala, writing Java is too much work :-)

 I am looking for scala based project that's using netty at its core (spray
 is one example).

 prediction.io is an option but that also looks quite complicated and not
 using all the ML features that got added in 1.3/1.4

 Velox built on top of ML / Keystone ML pipeline API and that's useful but
 it is still using javax servlets which is not netty based.

 On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Oops, that link was for Oryx 1. Here's the repo for Oryx 2:
 https://github.com/OryxProject/oryx

 On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Debasish,

 The Oryx project (https://github.com/cloudera/oryx), which is Apache 2
 licensed, contains a model server that can serve models built with MLlib.

 -Sandy

 On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com
 wrote:

 Is velox NOT open source?


 On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi,

 The demo of end-to-end ML pipeline including the model server
 component at Spark Summit was really cool.

 I was wondering if the Model Server component is based upon Velox or
 it uses a completely different architecture.

 https://github.com/amplab/velox-modelserver

 We are looking for an open source version of model server to build
 upon.

 Thanks.
 Deb



 --
 - Charles






Re: Velox Model Server

2015-06-21 Thread Nick Pentreath
Is there a presentation up about this end-to-end example?




I'm looking into velox now - our internal model pipeline just saves factors to 
S3 and model server loads them periodically from S3



—
Sent from Mailbox

On Sat, Jun 20, 2015 at 9:46 PM, Debasish Das debasish.da...@gmail.com
wrote:

 Integration of model server with ML pipeline API.
 On Sat, Jun 20, 2015 at 12:25 PM, Donald Szeto don...@prediction.io wrote:
 Mind if I ask what 1.3/1.4 ML features that you are looking for?


 On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote:

 After getting used to Scala, writing Java is too much work :-)

 I am looking for scala based project that's using netty at its core
 (spray is one example).

 prediction.io is an option but that also looks quite complicated and not
 using all the ML features that got added in 1.3/1.4

 Velox built on top of ML / Keystone ML pipeline API and that's useful but
 it is still using javax servlets which is not netty based.

 On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Oops, that link was for Oryx 1. Here's the repo for Oryx 2:
 https://github.com/OryxProject/oryx

 On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Debasish,

 The Oryx project (https://github.com/cloudera/oryx), which is Apache 2
 licensed, contains a model server that can serve models built with MLlib.

 -Sandy

 On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com
 wrote:

 Is velox NOT open source?


 On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi,

 The demo of end-to-end ML pipeline including the model server
 component at Spark Summit was really cool.

 I was wondering if the Model Server component is based upon Velox or
 it uses a completely different architecture.

 https://github.com/amplab/velox-modelserver

 We are looking for an open source version of model server to build
 upon.

 Thanks.
 Deb



 --
 - Charles






 --
 Donald Szeto
 PredictionIO



RE: Matrix Multiplication and mllib.recommendation

2015-06-17 Thread Nick Pentreath
One issue is that you broadcast the product vectors and then do a dot product 
one-by-one with the user vector.




You should try forming a matrix of the item vectors and doing the dot product 
as a matrix-vector multiply which will make things a lot faster.




Another optimisation that is avalailable on 1.4 is a recommendProducts method 
that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix 
multiply). I am not sure if this is available in The Python api yet. 




But you can do a version yourself by using mapPartitions over user factors, 
blocking the factors into sub-matrices and doing matrix multiply with item 
factor matrix to get scores on a block-by-block basis.




Also as Ilya says more parallelism can help. I don't think it's so necessary to 
do LSH with 30,000 items.



—
Sent from Mailbox

On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya
ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here 
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
  Keep in mind, you're actually doing a ton of math. Even with proper caching 
 and use of broadcast variables this will take a while defending on the size 
 of your cluster. To get real results you may want to look into locality 
 sensitive hashing to limit your search space and definitely look into 
 spinning up multiple threads to process your product features in parallel to 
 increase resource utilization on the cluster.
 Thank you,
 Ilya Ganelin
 -Original Message-
 From: afarahat [ayman.fara...@yahoo.commailto:ayman.fara...@yahoo.com]
 Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Matrix Multiplication and mllib.recommendation
 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication where
 I first get the product features, broadcast them and then do a dot product.
 Its still very slow. Any reason why
 here is a sample code
 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates and may only be used solely in 
 performance of work or services for Capital One. The information transmitted 
 herewith is intended only for use by the individual or entity to which it is 
 addressed. If the reader of this message is not the intended recipient, you 
 are hereby notified that any review, retransmission, dissemination, 
 distribution, copying or other use of, or taking of any action in reliance 
 upon this information is strictly prohibited. If you have received this 
 communication in error, please contact the sender and delete the material 
 from your computer.

Re: ALS predictALL not completing

2015-06-17 Thread Nick Pentreath
So to be clear, you're trying to use the recommendProducts method of
MatrixFactorizationModel? I don't see predictAll in 1.3.1

1.4.0 has a more efficient method to recommend products for all users (or
vice versa):
https://github.com/apache/spark/blob/v1.4.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L152



On Tue, Jun 16, 2015 at 4:30 PM, Ayman Farahat ayman.fara...@yahoo.com
wrote:

 This is 1.3.1

 Ayman Farahat
 --
 View my research on my SSRN Author page:
 http://ssrn.com/author=1594571

   --
  *From:* Nick Pentreath nick.pentre...@gmail.com
 *To:* user@spark.apache.org user@spark.apache.org
 *Sent:* Tuesday, June 16, 2015 4:23 AM
 *Subject:* Re: ALS predictALL not completing

 Which version of Spark are you using?

 On Tue, Jun 16, 2015 at 6:20 AM, afarahat ayman.fara...@yahoo.com wrote:

 Hello;
 I have a data set of about 80 Million users and 12,000 items (very sparse
 ).
 I can get the training part working no problem. (model has 20 factors),
 However, when i try using Predict all for 80 Million x 10 items , the jib
 does not complete.
 When i use a smaller data set say 500k or a million it completes.
 Any ideas suggestions ?
 Thanks
 Ayman



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: ALS predictALL not completing

2015-06-16 Thread Nick Pentreath
Which version of Spark are you using?

On Tue, Jun 16, 2015 at 6:20 AM, afarahat ayman.fara...@yahoo.com wrote:

 Hello;
 I have a data set of about 80 Million users and 12,000 items (very sparse
 ).
 I can get the training part working no problem. (model has 20 factors),
 However, when i try using Predict all for 80 Million x 10 items , the jib
 does not complete.
 When i use a smaller data set say 500k or a million it completes.
 Any ideas suggestions ?
 Thanks
 Ayman



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark job workflow engine recommendations

2015-08-11 Thread Nick Pentreath
I also tend to agree that Azkaban is somehqat easier to get set up. Though I 
haven't used the new UI for Oozie that is part of CDH, so perhaps that is 
another good option.




It's a pity Azkaban is a little rough in terms of documenting its API, and the 
scalability is an issue. However it would be possible to have a few different 
instances running for different use cases  / groups within the org perhaps



—
Sent from Mailbox

On Wed, Aug 12, 2015 at 12:14 AM, Vikram Kone vikramk...@gmail.com
wrote:

 Hi LarsThanks for the brain dump. All the points you made about target 
 audience, degree of high availability and time based scheduling instead of 
 event based scheduling are all valid and make sense.In our case, most of your 
 Devs are .net based and so xml or web based scheduling is preferred over 
 something written in Java/Scalia/Python. Based on my research so far on the 
 available workflow managers today, azkaban is the most easier to adopt since 
 it doesn't have any hard dependence on Hadoop and is easy to onboard and 
 schedule jobs. I was able to install and execute some spark workflows in a 
 day. Though the fact that it's being phased out in linkedin is troubling , I 
 think it's the best suited for our use case today. 
 Sent from Outlook
 On Sun, Aug 9, 2015 at 4:51 PM -0700, Lars Albertsson 
 lars.alberts...@gmail.com wrote:
 I used to maintain Luigi at Spotify, and got some insight in workflow
 manager characteristics and production behaviour in the process.
 I am evaluating options for my current employer, and the short list is
 basically: Luigi, Azkaban, Pinball, Airflow, and rolling our own. The
 latter is not necessarily more work than adapting an existing tool,
 since existing managers are typically more or less tied to the
 technology used by the company that created them.
 Are your users primarily developers building pipelines that drive
 data-intensive products, or are they analysts, producing business
 intelligence? These groups tend to have preferences for different
 types of tools and interfaces.
 I have a love/hate relationship with Luigi, but given your
 requirements, it is probably the best fit:
 * It has support for Spark, and it seems to be used and maintained.
 * It has no builtin support for Cassandra, but Cassandra is heavily
 used at Spotify. IIRC, the code required to support Cassandra targets
 is more or less trivial. There is no obvious single definition of a
 dataset in C*, so you'll have to come up with a convention and encode
 it as a Target subclass. I guess that is why it never made it outside
 Spotify.
 * The open source community is active and it is well tested in
 production at multiple sites.
 * It is easy to write dependencies, but in a Python DSL. If your users
 are developers, this is preferable over XML or a web interface. There
 are always quirks and odd constraints somewhere that require the
 expressive power of a programming language. It also allows you to
 create extensions without changing Luigi itself.
 * It does not have recurring scheduling bulitin. Luigi needs a motor
 to get going, typically cron, installed on a few machines for
 redundancy. In a typical pipeline scenario, you give output datasets a
 time parameter, which arranges for a dataset to be produced each
 hour/day/week/month.
 * It supports failure notifications.
 Pinball and Airflow have similar architecture to Luigi, with a single
 central scheduler and workers that submit and execute jobs. They seem
 to be more solidly engineered at a glance, but less battle tested
 outside Pinterest/Airbnb, and they have fewer integrations to the data
 ecosystem.
 Azkaban has a different architecture and user interface, and seems
 more geared towards data scientists than developers; it has a good UI
 for controlling jobs, but writing extensions and controlling it
 programmatically seems more difficult than for Luigi.
 All of the tools above are centralised, and the central component can
 become a bottleneck and a single point of problem. I am not aware of
 any decentralised open source workflow managers, but you can run
 multiple instances and shard manually.
 Regarding recurring jobs, it is typically undesirable to blindly run
 jobs at a certain time. If you run jobs, e.g. with cron, and process
 whatever data is available in your input sources, your jobs become
 indeterministic and unreliable. If incoming data is late or missing,
 your jobs will fail or create artificial skews in output data, leading
 to confusing results. Moreover, if jobs fail or have bugs, it will be
 difficult to rerun them and get predictable results. This is why I
 don't think Chronos is a meaningful alternative for scheduling data
 processing.
 There are different strategies on this topic, but IMHO, it is easiest
 create predictable and reliable pipelines by bucketing incoming data
 into datasets that you seal off, and mark ready for processing, and
 then use the workflow manager's DAG logic to process data when input

Re: Is there any tool that i can prove to customer that spark is faster then hive ?

2015-08-12 Thread Nick Pentreath
Perhaps you could time the end-to-end runtime for each pipeline, and each stage?




Through Id be fairly confidant that Spark will outperform hive/mahout on MR, 
that's not he only consideration - having everything on a single platform and 
the Spark / data frame API is a huge win just by itself









—
Sent from Mailbox

On Wed, Aug 12, 2015 at 1:45 PM, Ladle ladle.pa...@tcs.com wrote:

 Hi ,
 I have build the the machine learning features and model using Apache spark.
 And the same features i have i build using hive,java and used mahout to run
 model.
 Now how can i show to customer that Apache Spark is more faster then hive.
 Is there any tool that shows the time ?
 Regards,
 Ladle
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-tool-that-i-can-prove-to-customer-that-spark-is-faster-then-hive-tp24224.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: RDD[Future[T]] = Future[RDD[T]]

2015-07-27 Thread Nick Pentreath
In this case, each partition will block until the futures in that partition
are completed.

If you are in the end collecting all the Futures to the driver, what is the
reasoning behind using an RDD? You could just use a bunch of Futures
directly.

If you want to do some processing on the results of the futures, then I'd
say you would need to block in each partition until the Futures' results
are completed, as I'm not at all sure whether Futures would be composable
across stage / task boundaries.



On Mon, Jul 27, 2015 at 9:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote:

 do you mean something like this ?

 val values = rdd.mapPartitions{ i: Iterator[Future[T]] =
   val future: Future[Iterator[T]] = Future sequence i
   Await result (future, someTimeout)
 }


 Where is the blocking happening in this case? It seems to me that all the
 workers will be blocked until the future is completed, no ?

 2015-07-27 7:24 GMT+02:00 Nick Pentreath [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=24005i=0:

 You could use Iterator.single on the future[iterator].

 However if you collect all the partitions I'm not sure if it will work
 across executor boundaries. Perhaps you may need to await the sequence of
 futures in each partition and return the resulting iterator.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Sun, Jul 26, 2015 at 10:43 PM, Ayoub Benali [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=24005i=1 wrote:

 It doesn't work because mapPartitions expects a function f:(Iterator[T])
 ⇒ Iterator[U] while .sequence wraps the iterator in a Future

 2015-07-26 22:25 GMT+02:00 Ignacio Blasco [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=24005i=2:

 Maybe using mapPartitions and .sequence inside it?
  El 26/7/2015 10:22 p. m., Ayoub [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=24005i=3 escribió:

 Hello,

 I am trying to convert the result I get after doing some async IO :

 val rdd: RDD[T] = // some rdd

 val result: RDD[Future[T]] = rdd.map(httpCall)

 Is there a way collect all futures once they are completed in a *non
 blocking* (i.e. without scala.concurrent
 Await) and lazy way?

 If the RDD was a standard scala collection then calling
 scala.concurrent.Future.sequence would have resolved the issue but
 RDD is
 not a TraversableOnce (which is required by the method).

 Is there a way to do this kind of transformation with an
 RDD[Future[T]] ?

 Thanks,
 Ayoub.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=24005i=4
 For additional commands, e-mail: [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=24005i=5





 --
 View this message in context: Re: RDD[Future[T]] = Future[RDD[T]]
 http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-Future-T-Future-RDD-T-tp24005.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Velox Model Server

2015-07-13 Thread Nick Pentreath
Honestly I don't believe this kind of functionality belongs within
spark-jobserver.

For serving of factor-type models, you are typically in the realm of
recommendations or ad-serving scenarios - i.e. needing to score a user /
context against many possible items and return a top-k list of those. In
addition, filtering and search comes into play heavily - e.g. filter
recommendations by item category, by geo-location, by stock-level status,
by price / profit levels, by promoted / blocked content, etc etc. And the
requirements are typically real-time (i.e. a few hundred ms at the most).
So I think there are too many specialist requirements vs spark-jobserver.

In terms of general approach, your options are to:

(a) score first to get a list recs, and then filter / re-rank / apply
queries to further winnow that down. This typically means returning the top
L  K recs from the scoring, so that you have enough left after filtering.

(b) score and filter in the same step (or at least using the same engine).

Scoring is really the easiest part - modulo dealing with massive item-sets
which can be dealt with by (i) LSH / approx. nearest neighbour approaches
and/or (ii) partitioning / parallelization approaches.

We currently use approach (a) but are looking into (b) also.

I think the best idea is to pick one of the existing frameworks (whether
Oryx, Velox, PredictionIO, SeldonIO etc) that best suits your requirements,
and build around that. Or build something new of your own if you want to
use Akka.

I went with Scalatra because it is what our API layer is built with, and I
find the routing DSL much nicer vs Spray. Both have good Akka integration.
I don't think the front-end matters that much (Finatra is another option).
If you want Akka then maybe Spray / Akka HTTP is the best way to go.

Our current model server as I mentioned is very basic and a bit hacked
together, but it may have some useful ideas or serve as a starting point if
there is interest.



On Wed, Jun 24, 2015 at 5:46 PM, Debasish Das debasish.da...@gmail.com
wrote:

 Thanks Nick, Sean for the great suggestions...

 Since you guys have already hit these issues before I think it will be
 great if we can add the learning to Spark Job Server and enhance it for
 community.

 Nick, do you see any major issues in using Spray over Scalatra ?

 Looks like Model Server API layer needs access to a performant KV store
 (Redis/Memcached), Elastisearch (we used Solr before for item-item serving
 but I liked the Spark-Elastisearch integration, REST is Netty based unlike
 Solr's Jetty and YARN client looks more stable and so it is worthwhile to
 see if it improves over Solr based serving) and ML Models (which are moving
 towards Spark SQL style in 1.3/1.4 with the introduction of Pipeline API)

 An initial version of KV store might be simple LRU cache.

 For KV store are there any comparisons available with IndexedRDD and
 Redis/Memcached ?

 Velox is using CoreOS EtcdClient (which is Go based) but I am not sure if
 it is used as a full fledged distributed cache or not. May be it is being
 used as zookeeper alternative.


 On Wed, Jun 24, 2015 at 2:02 AM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 Ok

 My view is with only 100k items, you are better off serving in-memory
 for items vectors. i.e. store all item vectors in memory, and compute user
 * item score on-demand. In most applications only a small proportion of
 users are active, so really you don't need all 10m user vectors in memory.
 They could be looked up from a K-V store and have an LRU cache in memory
 for say 1m of those. Optionally also update them as feedback comes in.

 As far as I can see, this is pretty much what velox does except it
 partitions all user vectors across nodes to scale.

 Oryx does almost the same but Oryx1 kept all user and item vectors in
 memory (though I am not sure about whether Oryx2 still stores all user and
 item vectors in memory or partitions in some way).

 Deb, we are using a custom Akka-based model server (with Scalatra
 frontend). It is more focused on many small models in-memory (largest of
 these is around 5m user vectors, 100k item vectors, with factor size
 20-50). We use Akka cluster sharding to allow scale-out across nodes if
 required. We have a few hundred models comfortably powered by m3.xlarge AWS
 instances. Using floats you could probably have all of your factors in
 memory on one 64GB machine (depending on how many models you have).

 Our solution is not that generic and a little hacked-together - but I'd
 be happy to chat offline about sharing what we've done. I think it still
 has a basic client to the Spark JobServer which would allow triggering
 re-computation jobs periodically. We currently just run batch
 re-computation and reload factors from S3 periodically.

 We then use Elasticsearch to post-filter results and blend content-based
 stuff - which I think might be more efficient than SparkSQL for this
 particular purpose.

 On Wed, Jun 24

Re: thought experiment: use spark ML to real time prediction

2015-11-12 Thread Nick Pentreath
Yup, currently PMML export, or Java serialization, are the options
realistically available.

Though PMML may deter some, there are not many viable cross-platform
alternatives (with nearly as much coverage).

On Thu, Nov 12, 2015 at 1:42 PM, Sean Owen  wrote:

> This is all starting to sound a lot like what's already implemented in
> Java-based PMML parsing/scoring libraries like JPMML and OpenScoring. I'm
> not clear it helps a lot to reimplement this in Spark.
>
> On Thu, Nov 12, 2015 at 8:05 AM, Felix Cheung 
> wrote:
>
>> +1 on that. It would be useful to use the model outside of Spark.
>>
>>
>> _
>> From: DB Tsai 
>> Sent: Wednesday, November 11, 2015 11:57 PM
>> Subject: Re: thought experiment: use spark ML to real time prediction
>> To: Nirmal Fernando 
>> Cc: Andy Davidson , Adrian Tanase <
>> atan...@adobe.com>, user @spark 
>>
>>
>>
>> Do you think it will be useful to separate those models and model
>> loader/writer code into another spark-ml-common jar without any spark
>> platform dependencies so users can load the models trained by Spark ML in
>> their application and run the prediction?
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>> On Wed, Nov 11, 2015 at 3:14 AM, Nirmal Fernando 
>> wrote:
>>
>>> As of now, we are basically serializing the ML model and then
>>> deserialize it for prediction at real time.
>>>
>>> On Wed, Nov 11, 2015 at 4:39 PM, Adrian Tanase 
>>> wrote:
>>>
 I don’t think this answers your question but here’s how you would
 evaluate the model in realtime in a streaming app

 https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/predict.html

 Maybe you can find a way to extract portions of MLLib and run them
 outside of spark – loading the precomputed model and calling .predict on
 it…

 -adrian

 From: Andy Davidson
 Date: Tuesday, November 10, 2015 at 11:31 PM
 To: "user @spark"
 Subject: thought experiment: use spark ML to real time prediction

 Lets say I have use spark ML to train a linear model. I know I can save
 and load the model to disk. I am not sure how I can use the model in a real
 time environment. For example I do not think I can return a “prediction” to
 the client using spark streaming easily. Also for some applications the
 extra latency created by the batch process might not be acceptable.

 If I was not using spark I would re-implement the model I trained in my
 batch environment in a lang like Java  and implement a rest service that
 uses the model to create a prediction and return the prediction to the
 client. Many models make predictions using linear algebra. Implementing
 predictions is relatively easy if you have a good vectorized LA package. Is
 there a way to use a model I trained using spark ML outside of spark?

 As a motivating example, even if its possible to return data to the
 client using spark streaming. I think the mini batch latency would not be
 acceptable for a high frequency stock trading system.

 Kind regards

 Andy

 P.s. The examples I have seen so far use spark streaming to
 “preprocess” predictions. For example a recommender system might use what
 current users are watching to calculate “trending recommendations”. These
 are stored on disk and served up to users when the use the “movie guide”.
 If a recommendation was a couple of min. old it would not effect the end
 users experience.


>>>
>>>
>>> --
>>>
>>> Thanks & regards,
>>> Nirmal
>>>
>>> Team Lead - WSO2 Machine Learner
>>> Associate Technical Lead - Data Technologies Team, WSO2 Inc.
>>> Mobile: +94715779733
>>> Blog: http://nirmalfdo.blogspot.com/
>>>
>>>
>>>
>>
>>
>>
>


Re: DynamoDB Connector?

2015-11-16 Thread Nick Pentreath
See this thread for some info:
http://apache-spark-user-list.1001560.n3.nabble.com/DynamoDB-input-source-td8814.html

I don't think the situation has changed that much - if you're using Spark
on EMR, then I think the InputFormat is available in a JAR (though I
haven't tested that). Otherwise you'll need to try to get the JAR and see
if you can get it to work outside of EMR.

I'm afraid this thread (
https://forums.aws.amazon.com/thread.jspa?threadID=168506) does not appear
encouraging, even for using Spark on EMR to read from DynamoDB using
the InputFormat! It's a pity AWS doesn't open source the InputFormat.

On Mon, Nov 16, 2015 at 5:00 AM, Charles Cobb 
wrote:

> Hi,
>
> What is the best practice for reading from DynamoDB from Spark? I know I
> can use the Java API, but this doesn't seem to take data locality into
> consideration at all.
>
> I was looking for something along the lines of the cassandra connector:
> https://github.com/datastax/spark-cassandra-connector
>
> Thanks,
> CJ
>
>


Re: Machine learning with spark (book code example error)

2015-10-14 Thread Nick Pentreath
Hi there. I'm the author of the book (thanks for buying it by the way :)

Ideally if you're having any trouble with the book or code, it's best to
contact the publisher and submit a query (
https://www.packtpub.com/books/content/support/17400)

However, I can help with this issue. The problem is that the "testLabels"
code needs to be indented over multiple lines:

val testPath = "/PATH/20news-bydate-test/*"
val testRDD = sc.wholeTextFiles(testPath)
val testLabels = testRDD.map { case (file, text) =>
val topic = file.split("/").takeRight(2).head
newsgroupsMap(topic)
}

As it is in the sample code attached. If you copy the whole indented block
(or line by line) into the console, it should work - I've tested all the
sample code again and indeed it works for me.

Hope this helps
Nick

On Tue, Oct 13, 2015 at 8:31 PM, Zsombor Egyed 
wrote:

> Hi!
>
> I was reading the ML with spark book, and I was very interested about the
> 9. chapter (text mining), so I tried code examples.
>
> Everything was fine, but in this line:
>
> val testLabels = testRDD.map {
>
> case (file, text) => val topic = file.split("/").takeRight(2).head
>
> newsgroupsMap(topic) }
>
> I got an error: "value newsgroupsMap is not a member of String"
>
> Other relevant part of the code:
> val path = "/PATH/20news-bydate-train/*"
> val rdd = sc.wholeTextFiles(path)
> val newsgroups = rdd.map { case (file, text) =>
> file.split("/").takeRight(2).head }
>
> val tf = hashingTF.transform(tokens)
> val idf = new IDF().fit(tf)
> val tfidf = idf.transform(tf)
>
> val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
> val zipped = newsgroups.zip(tfidf)
> val train = zipped.map { case (topic, vector)
> =>LabeledPoint(newsgroupsMap(topic), vector) }
> train.cache
>
> val model = NaiveBayes.train(train, lambda = 0.1)
>
> val testPath = "/PATH//20news-bydate-test/*"
> val testRDD = sc.wholeTextFiles(testPath)
> val testLabels = testRDD.map { case (file, text) => val topic =
> file.split("/").takeRight(2).head newsgroupsMap(topic) }
>
> I attached the whole program code.
> Can anyone help, what the problem is?
>
> Regards,
> Zsombor
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: How to specify the numFeatures in HashingTF

2015-10-15 Thread Nick Pentreath
Setting the numfeatures higher than vocab size will tend to reduce the chance 
of hash collisions, but it's not strictly necessary - it becomes a memory / 
accuracy trade off.





Surprisingly, the impact on model performance of moderate hash collisions is 
often not significant.






So it may be worth trying a few settings out (lower than vocab, higher etc) and 
see what the impact is on evaluation metrics.



—
Sent from Mailbox

On Thu, Oct 15, 2015 at 5:46 PM, Jianguo Li 
wrote:

> Hi,
> There is a parameter in the HashingTF called "numFeatures". I was wondering
> what is the best way to set the value to this parameter. In the use case of
> text categorization, do you need to know in advance the number of words in
> your vocabulary? or do you set it to be a large value, greater than the
> number of words in your vocabulary?
> Thanks,
> Jianguo

Re: Spark job workflow engine recommendations

2015-10-07 Thread Nick Pentreath
We're also using Azkaban for scheduling, and we simply use spark-submit via 
she'll scripts. It works fine.




The auto retry feature with a large number of retries (like 100 or 1000 
perhaps) should take care of long-running jobs with restarts on failure. We 
haven't used it for streaming yet though we have long-running jobs and Azkaban 
won't kill them unless an SLA is in place.









—
Sent from Mailbox

On Wed, Oct 7, 2015 at 7:18 PM, Vikram Kone <vikramk...@gmail.com> wrote:

> Hien,
> I saw this pull request and from what I understand this is geared towards
> running spark jobs over hadoop. We are using spark over cassandra and not
> sure if this new jobtype supports that. I haven't seen any documentation in
> regards to how to use this spark job plugin, so that I can test it out on
> our cluster.
> We are currently submitting our spark jobs using command job type using the
> following command  "dse spark-submit --class com.org.classname ./test.jar"
> etc. What would be the advantage of using the native spark job type over
> command job type?
> I didn't understand from your reply if azkaban already supports long
> running jobs like spark streaming..does it? streaming jobs generally need
> to be running indefinitely or forever and needs to be restarted if for some
> reason they fail (lack of resources may be..). I can probably use the auto
> retry feature for this, but not sure
> I'm looking forward to the multiple executor support which will greatly
> enhance the scalability issue.
> On Wed, Oct 7, 2015 at 9:56 AM, Hien Luu <h...@linkedin.com> wrote:
>> The spark job type was added recently - see this pull request
>> https://github.com/azkaban/azkaban-plugins/pull/195.  You can leverage
>> the SLA feature to kill a job if it ran longer than expected.
>>
>> BTW, we just solved the scalability issue by supporting multiple
>> executors.  Within a week or two, the code for that should be merged in the
>> main trunk.
>>
>> Hien
>>
>> On Tue, Oct 6, 2015 at 9:40 PM, Vikram Kone <vikramk...@gmail.com> wrote:
>>
>>> Does Azkaban support scheduling long running jobs like spark steaming
>>> jobs? Will Azkaban kill a job if it's running for a long time.
>>>
>>>
>>> On Friday, August 7, 2015, Vikram Kone <vikramk...@gmail.com> wrote:
>>>
>>>> Hien,
>>>> Is Azkaban being phased out at linkedin as rumored? If so, what's
>>>> linkedin going to use for workflow scheduling? Is there something else
>>>> that's going to replace Azkaban?
>>>>
>>>> On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> In my opinion, choosing some particular project among its peers should
>>>>> leave enough room for future growth (which may come faster than you
>>>>> initially think).
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu <h...@linkedin.com> wrote:
>>>>>
>>>>>> Scalability is a known issue due the the current architecture.
>>>>>> However this will be applicable if you run more 20K jobs per day.
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> From what I heard (an ex-coworker who is Oozie committer), Azkaban
>>>>>>> is being phased out at LinkedIn because of scalability issues (though
>>>>>>> UI-wise, Azkaban seems better).
>>>>>>>
>>>>>>> Vikram:
>>>>>>> I suggest you do more research in related projects (maybe using their
>>>>>>> mailing lists).
>>>>>>>
>>>>>>> Disclaimer: I don't work for LinkedIn.
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath <
>>>>>>> nick.pentre...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Vikram,
>>>>>>>>
>>>>>>>> We use Azkaban (2.5.0) in our production workflow scheduling. We
>>>>>>>> just use local mode deployment and it is fairly easy to set up. It is
>>>>>>>> pretty easy to use and has a nice scheduling and logging interface, as 
>>>>>>>> well
>>>>>>>> as SLAs (like kill job and notify if it doesn't complete in 3 hours or
>>>>>>>> whatever).
>>>>>>>>
>>

Re: thought experiment: use spark ML to real time prediction

2015-11-17 Thread Nick Pentreath
I think the issue with pulling in all of spark-core is often with
dependencies (and versions) conflicting with the web framework (or Akka in
many cases). Plus it really is quite heavy if you just want a fairly
lightweight model-serving app. For example we've built a fairly simple but
scalable ALS factor model server on Scalatra, Akka and Breeze. So all you
really need is the web framework and Breeze (or an alternative linear
algebra lib).

I definitely hear the pain-point that PMML might not be able to handle some
types of transformations or models that exist in Spark. However, here's an
example from scikit-learn -> PMML that may be instructive (
https://github.com/scikit-learn/scikit-learn/issues/1596 and
https://github.com/jpmml/jpmml-sklearn), where a fairly impressive list of
estimators and transformers are supported (including e.g. scaling and
encoding, and PCA).

I definitely think the current model I/O and "export" or "deploy to
production" situation needs to be improved substantially. However, you are
left with the following options:

(a) build out a lightweight "spark-ml-common" project that brings in the
dependencies needed for production scoring / transformation in independent
apps. However, here you only support Scala/Java - what about R and Python?
Also, what about the distributed models? Perhaps "local" wrappers can be
created, though this may not work for very large factor or LDA models. See
also H20 example http://docs.h2o.ai/h2oclassic/userguide/scorePOJO.html

(b) build out Spark's PMML support, and add missing stuff to PMML where
possible. The benefit here is an existing standard with various tools for
scoring (via REST server, Java app, Pig, Hive, various language support).

(c) build out a more comprehensive I/O, serialization and scoring
framework. Here you face the issue of supporting various predictors and
transformers generically, across platforms and versioning. i.e. you're
re-creating a new standard like PMML

Option (a) is do-able, but I'm a bit concerned that it may be too "Spark
specific", or even too "Scala / Java" specific. But it is still potentially
very useful to Spark users to build this out and have a somewhat standard
production serving framework and/or library (there are obviously existing
options like PredictionIO etc).

Option (b) is really building out the existing PMML support within Spark,
so a lot of the initial work has already been done. I know some folks had
(or have) licensing issues with some components of JPMML (e.g. the
evaluator and REST server). But perhaps the solution here is to build an
Apache2-licensed evaluator framework.

Option (c) is obviously interesting - "let's build a better PMML (that uses
JSON or whatever instead of XML!)". But it also seems like a huge amount of
reinventing the wheel, and like any new standard would take time to garner
wide support (if at all).

It would be really useful to start to understand what the main missing
pieces are in PMML - perhaps the lowest-hanging fruit is simply to
contribute improvements or additions to PMML.



On Fri, Nov 13, 2015 at 11:46 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> That may not be an issue if the app using the models runs by itself (not
> bundled into an existing app), which may actually be the right way to
> design it considering separation of concerns.
>
> Regards
> Sab
>
> On Fri, Nov 13, 2015 at 9:59 AM, DB Tsai  wrote:
>
>> This will bring the whole dependencies of spark will may break the web
>> app.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>> On Thu, Nov 12, 2015 at 8:15 PM, Nirmal Fernando  wrote:
>>
>>>
>>>
>>> On Fri, Nov 13, 2015 at 2:04 AM, darren  wrote:
>>>
 I agree 100%. Making the model requires large data and many cpus.

 Using it does not.

 This is a very useful side effect of ML models.

 If mlib can't use models outside spark that's a real shame.

>>>
>>> Well you can as mentioned earlier. You don't need Spark runtime for
>>> predictions, save the serialized model and deserialize to use. (you need
>>> the Spark Jars in the classpath though)
>>>


 Sent from my Verizon Wireless 4G LTE smartphone


  Original message 
 From: "Kothuvatiparambil, Viju" <
 viju.kothuvatiparam...@bankofamerica.com>
 Date: 11/12/2015 3:09 PM (GMT-05:00)
 To: DB Tsai , Sean Owen 
 Cc: Felix Cheung , Nirmal Fernando <
 nir...@wso2.com>, Andy Davidson ,
 Adrian Tanase , "user @spark" ,
 Xiangrui Meng , hol...@pigscanfly.ca
 Subject: RE: thought experiment: use spark ML to real time prediction

 I am glad to see DB’s comments, 

Re: Spark works with the data in another cluster(Elasticsearch)

2015-08-25 Thread Nick Pentreath
While it's true locality might speed things up, I'd say it's a very bad idea to 
mix your Spark and ES clusters - if your ES cluster is serving production 
queries (and in particular using aggregations), you'll run into performance 
issues on your production ES cluster.




ES-hadoop uses ES scan  scroll to pull data pretty efficiently, so pulling it 
across the network is not too bad. If you do need to avoid that, pull the data 
and write what you need to HDFS as say parquet files (eg pull data daily and 
write it, then you have all data available on your Spark cluster).




And of course ensure thatbwhen you do pull data from ES to Spark, you cache it 
to avoid hitting the network again



—
Sent from Mailbox

On Tue, Aug 25, 2015 at 12:01 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 If the data is local to the machine then obviously it will be faster
 compared to pulling it through the network and storing it locally (either
 memory or disk etc). Have a look at the data locality
 http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/data_locality.html
 .
 Thanks
 Best Regards
 On Tue, Aug 18, 2015 at 8:09 PM, gen tang gen.tan...@gmail.com wrote:
 Hi,

 Currently, I have my data in the cluster of Elasticsearch and I try to use
 spark to analyse those data.
 The cluster of Elasticsearch and the cluster of spark are two different
 clusters. And I use hadoop input format(es-hadoop) to read data in ES.

 I am wondering how this environment affect the speed of analysis.
 If I understand well, spark will read data from ES cluster and do
 calculate on its own cluster(include writing shuffle result on its own
 machine), Is this right? If this is correct, I think that the performance
 will just a little bit slower than the data stored on the same cluster.

 I will be appreciated if someone can share his/her experience about using
 spark with elasticsearch.

 Thanks a lot in advance for your help.

 Cheers
 Gen


Re: Spark ANN

2015-09-07 Thread Nick Pentreath
Haven't checked the actual code but that doc says "MLPC employes 
backpropagation for learning the model. .."?










—
Sent from Mailbox

On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov 
wrote:

> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
> Implementation seems missing backpropagation?
> Was there is a good reason to omit BP?
> What are the drawbacks of a pure feedforward-only ANN?
> Thanks!
> -- 
> Ruslan Dautkhanov

Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-12 Thread Nick Pentreath
You might want to check out https://github.com/lensacom/sparkit-learn





Though it's true for random

Forests / trees you will need to use MLlib



—
Sent from Mailbox

On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke  wrote:

> I fear you have to do the plumbing all yourself. This is the same for all
> commercial and non-commercial libraries/analytics packages. It often also
> depends on the functional requirements on how you distribute.
> Le sam. 12 sept. 2015 à 20:18, Rex X  a écrit :
>> Hi everyone,
>>
>> What is the best way to migrate existing scikit-learn code to PySpark
>> cluster? Then we can bring together the full power of both scikit-learn and
>> spark, to do scalable machine learning. (I know we have MLlib. But the
>> existing code base is big, and some functions are not fully supported yet.)
>>
>> Currently I use multiprocessing module of Python to boost the speed. But
>> this only works for one node, while the data set is small.
>>
>> For many real cases, we may need to deal with gigabytes or even terabytes
>> of data, with thousands of raw categorical attributes, which can lead to
>> millions of discrete features, using 1-of-k representation.
>>
>> For these cases, one solution is to use distributed memory. That's why I
>> am considering spark. And spark support Python!
>> With Pyspark, we can import scikit-learn.
>>
>> But the question is how to make the scikit-learn code, decisionTree
>> classifier for example, running in distributed computing mode, to benefit
>> the power of Spark?
>>
>>
>> Best,
>> Rex
>>

Re: What is the best way to migrate existing scikit-learn code to PySpark?

2015-09-13 Thread Nick Pentreath
I should point out that I'm not sure what the performance of that project is.




I'd expect that native data frame in PySpark will be significantly more 
efficient than their DictRDD. 




It would be interesting to see a performance comparison for the pipelines 
relative to native Spark ML pipelines, if you do test both out.



—
Sent from Mailbox

On Sat, Sep 12, 2015 at 10:52 PM, Rex X <dnsr...@gmail.com> wrote:

> Jorn and Nick,
> Thanks for answering.
> Nick, the sparkit-learn project looks interesting. Thanks for mentioning it.
> Rex
> On Sat, Sep 12, 2015 at 12:05 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>> You might want to check out https://github.com/lensacom/sparkit-learn
>> <https://github.com/lensacom/sparkit-learn/blob/master/README.rst>
>>
>> Though it's true for random
>> Forests / trees you will need to use MLlib
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>
>>
>> On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>>
>>> I fear you have to do the plumbing all yourself. This is the same for all
>>> commercial and non-commercial libraries/analytics packages. It often also
>>> depends on the functional requirements on how you distribute.
>>>
>>> Le sam. 12 sept. 2015 à 20:18, Rex X <dnsr...@gmail.com> a écrit :
>>>
>>>> Hi everyone,
>>>>
>>>> What is the best way to migrate existing scikit-learn code to PySpark
>>>> cluster? Then we can bring together the full power of both scikit-learn and
>>>> spark, to do scalable machine learning. (I know we have MLlib. But the
>>>> existing code base is big, and some functions are not fully supported yet.)
>>>>
>>>> Currently I use multiprocessing module of Python to boost the speed. But
>>>> this only works for one node, while the data set is small.
>>>>
>>>> For many real cases, we may need to deal with gigabytes or even
>>>> terabytes of data, with thousands of raw categorical attributes, which can
>>>> lead to millions of discrete features, using 1-of-k representation.
>>>>
>>>> For these cases, one solution is to use distributed memory. That's why I
>>>> am considering spark. And spark support Python!
>>>> With Pyspark, we can import scikit-learn.
>>>>
>>>> But the question is how to make the scikit-learn code, decisionTree
>>>> classifier for example, running in distributed computing mode, to benefit
>>>> the power of Spark?
>>>>
>>>>
>>>> Best,
>>>> Rex
>>>>
>>>
>>

  1   2   3   >