Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Hemant Bhanawat
Hi,

I have compiled a list (from online sources) of knobs/design considerations
that need to be taken care of by applications running on spark streaming.
Is my understanding correct?  Any other important design consideration that
I should take care of?


   - A DStream is associated with a single receiver. For attaining read
   parallelism multiple receivers i.e. multiple DStreams need to be created.
   - A receiver is run within an executor. It occupies one core. Ensure
   that there are enough cores for processing after receiver slots are booked
   i.e. spark.cores.max should take the receiver slots into account.
   - The receivers are allocated to executors in a round robin fashion.
   - When data is received from a stream source, receiver creates blocks of
   data.  A new block of data is generated every blockInterval milliseconds. N
   blocks of data are created during the batchInterval where N =
   batchInterval/blockInterval.
   - These blocks are distributed by the BlockManager of the current
   executor to the block managers of other executors. After that, the Network
   Input Tracker running on the driver is informed about the block locations
   for further processing.
   - A RDD is created on the driver for the blocks created during the
   batchInterval. The blocks generated during the batchInterval are partitions
   of the RDD. Each partition is a task in spark. blockInterval==
   batchinterval would mean that a single partition is created and probably it
   is processed locally.
   - Having bigger blockinterval means bigger blocks. A high value of
   spark.locality.wait increases the chance of processing a block on the local
   node. A balance needs to be found out between these two parameters to
   ensure that the bigger blocks are processed locally.
   - Instead of relying on batchInterval and blockInterval, you can define
   the number of partitions by calling dstream.repartition(n). This reshuffles
   the data in RDD randomly to create n number of partitions.
   - An RDD's processing is scheduled by driver's jobscheduler as a job. At
   a given point of time only one job is active. So, if one job is executing
   the other jobs are queued.
   - If you have two dstreams there will be two RDDs formed and there will
   be two jobs created which will be scheduled one after the another.
   - To avoid this, you can union two dstreams. This will ensure that a
   single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
   is then considered as a single job. However the partitioning of the RDDs is
   not impacted.
   - If the batch processing time is more than batchinterval then obviously
   the receiver's memory will start filling up and will end up in throwing
   exceptions (most probably BlockNotFoundException). Currently there is  no
   way to pause the receiver.
   - For being fully fault tolerant, spark streaming needs to enable
   checkpointing. Checkpointing increases the batch processing time.
   - The frequency of metadata checkpoint cleaning can be controlled using
   spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when
   the RDDs in the checkpoint are no more required.



Thanks,
Hemant


userClassPathFirst and loader constraint violation

2015-05-20 Thread Sean Owen
(Marcelo you might have some insight on this one)

Warning: this may just be because I'm doing something non-standard --
trying embed Spark in a Java app and feed it all the classpath it
needs manually. But this was surprising enough I wanted to ask.

I have an app that includes among other things SLF4J. I have set
spark.{driver,executor}.userClassPathFirst to true. If I run it and
let it start a Spark job, it quickly fails with:

2015-05-20 04:35:01,747 WARN  TaskSetManager:71 Lost task 0.0 in stage
0.0 (TID 0, x.cloudera.com): java.lang.LinkageError: loader constraint
violation: loader (instance of
org/apache/spark/util/ChildFirstURLClassLoader) previously initiated
loading for a different type with name org/slf4j/Logger
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at 
org.apache.spark.util.ChildFirstURLClassLoader.liftedTree1$1(MutableURLClassLoader.scala:74)
at 
org.apache.spark.util.ChildFirstURLClassLoader.loadClass(MutableURLClassLoader.scala:73)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
...

I can see that this class was loaded from my app JAR:

[Loaded org.slf4j.Logger from file:/home/sowen/oryx-batch-2.0.0-SNAPSHOT.jar]

I'm assuming it's also loaded in some Spark classloader.
Tracing the code, I don't see that it ever gets to consulting any
other classloader; this happens during its own child-first attempt to
load the class.

This didn't happen in 1.2, FWIW, when the implementation was
different, but that's only to say it was different, not correct.

Anyone have thoughts on what this indicates? something to be expected
or surprising?

I think that disabling userClassPathFirst gets rid of this of course,
although that may cause other issues later.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Regarding Connecting spark to Mesos documentation

2015-05-20 Thread Meethu Mathew

Hi List,

In  the documentation of Connecting Spark to Mesos 
http://spark.apache.org/docs/latest/running-on-mesos.html#connecting-spark-to-mesos, 
is it possible to modify and write in detail the step  Create a binary 
package using make-distribution.sh --tgz ? When we use custom compiled 
version of Spark, mostly we specify a hadoop version (which is not the 
default one). In this case, make-distribution.sh should be supplied the 
same maven options we used for building spark. This is not specified  in 
the documentation. Please correct me , if I am wrong.


Regards,
Meethu Mathew


Re: [Catalyst] RFC: Using PartialFunction literals instead of objects

2015-05-20 Thread Edoardo Vacchi
Thanks for the prompt feedback; I have further expanded on your
suggestions on this JIRA
https://issues.apache.org/jira/browse/SPARK-7754

On Tue, May 19, 2015 at 8:35 PM, Michael Armbrust
mich...@databricks.com wrote:
 Overall this seems like a reasonable proposal to me.  Here are a few
 thoughts:

  - There is some debugging utility to the ruleName, so we would probably
 want to at least make that an argument to the rule function.
  - We also have had rules that operate on SparkPlan, though since there is
 only one ATM maybe we don't need sugar there.
  - I would not call the sugar for creating Strategies rule/seqrule, as I
 think the one-to-one vs one-to-many distinction is useful.
  - I'm generally pro-refactoring to make the code nicer, especially when its
 not official public API, but I do think its important to maintain source
 compatibility (which I think you are) when possible as there are other
 projects using catalyst.
  - Finally, we'll have to balance this with other code changes / conflicts.

 You should probably open a JIRA and we can continue the discussion there.

 On Tue, May 19, 2015 at 4:16 AM, Edoardo Vacchi uncommonnonse...@gmail.com
 wrote:

 Hi everybody,

 At the moment, Catalyst rules are defined using two different types of
 rules:
 `Rule[LogicalPlan]` and `Strategy` (which in turn maps to
 `GenericStrategy[SparkPlan]`).

 I propose to introduce utility methods to

   a) reduce the boilerplate to define rewrite rules
   b) turning them back into what they essentially represent: function
 types.

 These changes would be backwards compatible, and would greatly help in
 understanding what the code does. Personally, I feel like the current
 use of objects is redundant and possibly confusing.

 ## `Rule[LogicalPlan]`

 The analyzer and optimizer use `Rule[LogicalPlan]`, which, besides
 defining a default `val ruleName`
 only defines the method `apply(plan: TreeType): TreeType`.
 Because the body of such method is always supposed to read `plan match
 pf`, with `pf`
 being some `PartialFunction[LogicalPlan, LogicalPlan]`, we can
 conclude that `Rule[LogicalPlan]`
 might be substituted by a PartialFunction.

 I propose the following:

 a) Introduce the utility method

 def rule(pf: PartialFunction[LogicalPlan, LogicalPlan]):
 Rule[LogicalPlan] =
   new Rule[LogicalPlan] {
 def apply (plan: LogicalPlan): LogicalPlan = plan transform pf
   }

 b) progressively replace the boilerplate-y object definitions; e.g.

 object MyRewriteRule extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case ... = ...
 }

 with

 // define a Rule[LogicalPlan]
 val MyRewriteRule = rule {
   case ... = ...
 }

 it might also be possible to make rule method `implicit`, thereby
 further reducing MyRewriteRule to:

 // define a PartialFunction[LogicalPlan, LogicalPlan]
 // the implicit would convert it into a Rule[LogicalPlan] at the use
 sites
 val MyRewriteRule = {
   case ... = ...
 }


 ## Strategies

 A similar solution could be applied to shorten the code for
 Strategies, which are total functions
 only because they are all supposed to manage the default case,
 possibly returning `Nil`. In this case
 we might introduce the following utility methods:

 /**
  * Generate a Strategy from a PartialFunction[LogicalPlan, SparkPlan].
  * The partial function must therefore return *one single* SparkPlan
 for each case.
  * The method will automatically wrap them in a [[Seq]].
  * Unhandled cases will automatically return Seq.empty
  */
 protected def rule(pf: PartialFunction[LogicalPlan, SparkPlan]): Strategy
 =
   new Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] =
   if (pf.isDefinedAt(plan)) Seq(pf.apply(plan)) else Seq.empty
   }

 /**
  * Generate a Strategy from a PartialFunction[ LogicalPlan, Seq[SparkPlan]
 ].
  * The partial function must therefore return a Seq[SparkPlan] for each
 case.
  * Unhandled cases will automatically return Seq.empty
  */
 protected def seqrule(pf: PartialFunction[LogicalPlan,
 Seq[SparkPlan]]): Strategy =
   new Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] =
   if (pf.isDefinedAt(plan)) pf.apply(plan) else Seq.empty[SparkPlan]
   }

 Thanks in advance
 e.v.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.4.0 (RC1)

2015-05-20 Thread Sean Owen
Signature, hashes, LICENSE/NOTICE, source tarball looks OK. I built
for Hadoop 2.6 (-Pyarn -Phive -Phadoop-2.6) on Ubuntu from source and
tests pass. The release looks OK except that I'd like to resolve the
Blockers before giving a +1.

I'm seeing some test failures, and wanted to cross-check with others.
They're all in Hive. Some I think are due to Java 8 differences and
are just test issues; they expect an exact output from a query plan
and some HashSet ordering differences make it trivially different. If
so, I've seen this in the past and we could ignore it for now, but
would be good to get a second set of eyes. The trace is big so it's at
the end.

When rerunning with Java 7 I get a different error due to Hive version support:

- success sanity check *** FAILED ***
  java.lang.RuntimeException: [download failed:
org.jboss.netty#netty;3.2.2.Final!netty.jar(bundle), download failed:
commons-net#commons-net;3.1!commons-net.jar]
  at 
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:972)
  at 
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anonfun$3.apply(IsolatedClientLoader.scala:62)
  ...



Hive / possible Java 8 test issue:

- windowing.q -- 20. testSTATs *** FAILED ***
  Results do not match for windowing.q -- 20. testSTATs:
  == Parsed Logical Plan ==
  'WithWindowDefinition Map(w1 - WindowSpecDefinition ROWS BETWEEN 2
PRECEDING AND 2 FOLLOWING)
   'Project ['p_mfgr,'p_name,'p_size,UnresolvedWindowExpression
WindowSpecReference(w1)
   UnresolvedWindowFunction stddev
UnresolvedAttribute [p_retailprice]
   AS sdev#159481,UnresolvedWindowExpression WindowSpecReference(w1)
   UnresolvedWindowFunction stddev_pop
UnresolvedAttribute [p_retailprice]
   AS sdev_pop#159482,UnresolvedWindowExpression WindowSpecReference(w1)
   UnresolvedWindowFunction collect_set
UnresolvedAttribute [p_size]
   AS uniq_size#159483,UnresolvedWindowExpression WindowSpecReference(w1)
   UnresolvedWindowFunction variance
UnresolvedAttribute [p_retailprice]
   AS var#159484,UnresolvedWindowExpression WindowSpecReference(w1)
   UnresolvedWindowFunction corr
UnresolvedAttribute [p_size]
UnresolvedAttribute [p_retailprice]
   AS cor#159485,UnresolvedWindowExpression WindowSpecReference(w1)
   UnresolvedWindowFunction covar_pop
UnresolvedAttribute [p_size]
UnresolvedAttribute [p_retailprice]
   AS covarp#159486]
'UnresolvedRelation [part], None

  == Analyzed Logical Plan ==
  p_mfgr: string, p_name: string, p_size: int, sdev: double, sdev_pop:
double, uniq_size: arrayint, var: double, cor: double, covarp:
double
  Project 
[p_mfgr#159489,p_name#159488,p_size#159492,sdev#159481,sdev_pop#159482,uniq_size#159483,var#159484,cor#159485,covarp#159486]
   Window [p_mfgr#159489,p_name#159488,p_size#159492,p_retailprice#159494],
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
sdev#159481,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
sdev_pop#159482,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet(p_size#159492)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
uniq_size#159483,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance(p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
var#159484,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCorrelation(p_size#159492,p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
cor#159485,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCovariance(p_size#159492,p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
covarp#159486], WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2
FOLLOWING
Project [p_mfgr#159489,p_name#159488,p_size#159492,p_retailprice#159494]
 MetastoreRelation default, part, None

  == Optimized Logical Plan ==
  Project 
[p_mfgr#159489,p_name#159488,p_size#159492,sdev#159481,sdev_pop#159482,uniq_size#159483,var#159484,cor#159485,covarp#159486]
   Window [p_mfgr#159489,p_name#159488,p_size#159492,p_retailprice#159494],
[HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
sdev#159481,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
sdev_pop#159482,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet(p_size#159492)
WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS
uniq_size#159483,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance(p_retailprice#159494)
WindowSpecDefinition ROWS BETWEEN 2 

Re: Contribute code to MLlib

2015-05-20 Thread Trevor Grant
Hey Ram,

I'm not speaking to Tarek's package specifically but to the spirit of
MLib.  There are a number of method/algorithms for PCA, I'm not sure by
what criterion the current one is considered 'standard'.

It is rare to find ANY machine learning algo that is 'clearly better' than
any other.  They are all tools, they have their place and time.  I agree
that it makes sense to field new algorithms as packages and then integrate
into MLib once they are 'proven' (in terms of stability/performance/anyone
cares).  That being said, if MLib takes the stance that 'what we have is
good enough unless something is *clearly* better', then it will never grow
into a suite with the depth and richness of sklearn. From a practitioner's
stand point, its nice to have everything I could ever want ready in an
'off-the-shelf' form.

'A large number of use cases better than existing' shouldn't be a criteria
when selecting what to include in MLib.  The important question should be,
'Are you willing to take on responsibility for maintaining this because you
may be the only person on earth who understands the mechanics AND how to
code it?'.   Obviously we don't want any random junk algo included.  But
trying to say, 'this way of doing PCA is better than that way in a large
class of cases' is like trying to say 'geometry is more important than
calculus in large class of cases, maybe its true- but geometry won't help
you if you are in a case where you need calculus.

This all relies on the assumption that MLib is destined to be a rich data
science/machine learning package.  It may be that the goal is to make the
project as lightweight and parsimonious as possible, if so excuse me for
speaking out of turn.


On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com
wrote:

 Hi Trevor, Tarek

 You make non standard algorithms (PCA or otherwise) available to users of
 Spark as Spark Packages.
 http://spark-packages.org
 https://databricks.com/blog/2014/12/22/announcing-spark-packages.html

 With the availability of spark packages, adding powerful experimental /
 alternative machine learning algorithms to the pipeline has never been
 easier. I would suggest that route in scenarios where one machine learning
 algorithm is not clearly better in the common scenarios than an existing
 implementation in MLLib.

 If your algorithm is for a large class of use cases better than the
 existing PCA implementation, then we should open a JIRA and discuss the
 relative strengths/ weaknesses (perhaps with some benchmarks) so we can
 better understand if it makes sense to switch out the existing PCA
 implementation and make yours the default.

 Ram

 On Tue, May 19, 2015 at 6:56 AM, Trevor Grant trevor.d.gr...@gmail.com
 wrote:

  There are most likely advantages and disadvantages to Tarek's algorithm
 against the current implementation, and different scenarios where each is
 more appropriate.

 Would we not offer multiple PCA algorithms and let the user choose?

 Trevor

 Trevor Grant
 Data Scientist

 *Fortunate is he, who is able to know the causes of things.  -Virgil*


 On Mon, May 18, 2015 at 4:18 PM, Joseph Bradley jos...@databricks.com
 wrote:

 Hi Tarek,

 Thanks for your interest  for checking the guidelines first!  On 2
 points:

 Algorithm: PCA is of course a critical algorithm.  The main question is
 how your algorithm/implementation differs from the current PCA.  If it's
 different and potentially better, I'd recommend opening up a JIRA for
 explaining  discussing it.

 Java/Scala: We really do require that algorithms be in Scala, for the
 sake of maintainability.  The conversion should be doable if you're willing
 since Scala is a pretty friendly language.  If you create the JIRA, you
 could also ask for help there to see if someone can collaborate with you to
 convert the code to Scala.

 Thanks!
 Joseph

 On Mon, May 18, 2015 at 3:13 AM, Tarek Elgamal tarek.elga...@gmail.com
 wrote:

 Hi,

 I would like to contribute an algorithm to the MLlib project. I have
 implemented a scalable PCA algorithm on spark. It is scalable for both tall
 and fat matrices and the paper around it is accepted for publication in
 SIGMOD 2015 conference. I looked at the guidelines in the following link:


 https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines

 I believe that most of the guidelines applies in my case, however, the
 code is written in java and it was not clear in the guidelines whether
 MLLib project accepts java code or not.
 My algorithm can be found under this repository:
 https://github.com/Qatar-Computing-Research-Institute/sPCA

 Any help on how to make it suitable for MLlib project will be greatly
 appreciated.

 Best Regards,
 Tarek Elgamal









Re: userClassPathFirst and loader constraint violation

2015-05-20 Thread Marcelo Vanzin
Hmm... this seems to be particular to logging (KafkaRDD.scala:89 in my tree
is a log statement). I'd expect KafkaRDD to be loaded from the system class
loader - or are you repackaging it in your app?

I'd have to investigate more to come with an accurate explanation here...
but it seems that the initialization of the logging system, which happens
after SparkSubmit runs and sets the context class loader to be an instance
of ChildFirstURLClassLoader, is causing things to blow up. I'll see if I
can spend some cycles coming up with a proper explanation (and hopefully a
fix or workaround).

For now, you could probably avoid this by not repackaging the logging
dependencies in your app.


On Wed, May 20, 2015 at 5:03 AM, Sean Owen so...@cloudera.com wrote:

 (Marcelo you might have some insight on this one)

 Warning: this may just be because I'm doing something non-standard --
 trying embed Spark in a Java app and feed it all the classpath it
 needs manually. But this was surprising enough I wanted to ask.

 I have an app that includes among other things SLF4J. I have set
 spark.{driver,executor}.userClassPathFirst to true. If I run it and
 let it start a Spark job, it quickly fails with:

 2015-05-20 04:35:01,747 WARN  TaskSetManager:71 Lost task 0.0 in stage
 0.0 (TID 0, x.cloudera.com): java.lang.LinkageError: loader constraint
 violation: loader (instance of
 org/apache/spark/util/ChildFirstURLClassLoader) previously initiated
 loading for a different type with name org/slf4j/Logger
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at
 org.apache.spark.util.ChildFirstURLClassLoader.liftedTree1$1(MutableURLClassLoader.scala:74)
 at
 org.apache.spark.util.ChildFirstURLClassLoader.loadClass(MutableURLClassLoader.scala:73)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:89)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 ...

 I can see that this class was loaded from my app JAR:

 [Loaded org.slf4j.Logger from
 file:/home/sowen/oryx-batch-2.0.0-SNAPSHOT.jar]

 I'm assuming it's also loaded in some Spark classloader.
 Tracing the code, I don't see that it ever gets to consulting any
 other classloader; this happens during its own child-first attempt to
 load the class.

 This didn't happen in 1.2, FWIW, when the implementation was
 different, but that's only to say it was different, not correct.

 Anyone have thoughts on what this indicates? something to be expected
 or surprising?

 I think that disabling userClassPathFirst gets rid of this of course,
 although that may cause other issues later.




-- 
Marcelo


Re: Contribute code to MLlib

2015-05-20 Thread Ram Sriharsha
Hi Trevor

Good point, I didn't mean that some algorithm has to be clearly better than
another in every scenario to be included in MLLib. However, even if someone
is willing to be the maintainer of a piece of code, it does not make sense
to accept every possible algorithm into the core library.

That said, the specific algorithms should be discussed in the JIRA: as you
point out, there is no clear way to decide what algorithm to include and
what not to, and usually mature algorithms that serve a wide variety of
scenarios are easier to argue about but nothing prevents anyone from
opening a ticket to discuss any specific machine learning algorithm.

My suggestion was simply that for purposes of making experimental or newer
algorithms available to Spark users, it doesn't necessarily have to be in
the core library. Spark packages are good enough in this respect.

Isn't it better for newer algorithms to take this route and prove
themselves before we bring them into the core library? Especially given the
barrier to using spark packages is very low.

Ram



On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com
wrote:

 Hey Ram,

 I'm not speaking to Tarek's package specifically but to the spirit of
 MLib.  There are a number of method/algorithms for PCA, I'm not sure by
 what criterion the current one is considered 'standard'.

 It is rare to find ANY machine learning algo that is 'clearly better' than
 any other.  They are all tools, they have their place and time.  I agree
 that it makes sense to field new algorithms as packages and then integrate
 into MLib once they are 'proven' (in terms of stability/performance/anyone
 cares).  That being said, if MLib takes the stance that 'what we have is
 good enough unless something is *clearly* better', then it will never
 grow into a suite with the depth and richness of sklearn. From a
 practitioner's stand point, its nice to have everything I could ever want
 ready in an 'off-the-shelf' form.

 'A large number of use cases better than existing' shouldn't be a criteria
 when selecting what to include in MLib.  The important question should be,
 'Are you willing to take on responsibility for maintaining this because you
 may be the only person on earth who understands the mechanics AND how to
 code it?'.   Obviously we don't want any random junk algo included.  But
 trying to say, 'this way of doing PCA is better than that way in a large
 class of cases' is like trying to say 'geometry is more important than
 calculus in large class of cases, maybe its true- but geometry won't help
 you if you are in a case where you need calculus.

 This all relies on the assumption that MLib is destined to be a rich data
 science/machine learning package.  It may be that the goal is to make the
 project as lightweight and parsimonious as possible, if so excuse me for
 speaking out of turn.


 On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com
 wrote:

 Hi Trevor, Tarek

 You make non standard algorithms (PCA or otherwise) available to users of
 Spark as Spark Packages.
 http://spark-packages.org
 https://databricks.com/blog/2014/12/22/announcing-spark-packages.html

 With the availability of spark packages, adding powerful experimental /
 alternative machine learning algorithms to the pipeline has never been
 easier. I would suggest that route in scenarios where one machine learning
 algorithm is not clearly better in the common scenarios than an existing
 implementation in MLLib.

 If your algorithm is for a large class of use cases better than the
 existing PCA implementation, then we should open a JIRA and discuss the
 relative strengths/ weaknesses (perhaps with some benchmarks) so we can
 better understand if it makes sense to switch out the existing PCA
 implementation and make yours the default.

 Ram

 On Tue, May 19, 2015 at 6:56 AM, Trevor Grant trevor.d.gr...@gmail.com
 wrote:

  There are most likely advantages and disadvantages to Tarek's algorithm
 against the current implementation, and different scenarios where each is
 more appropriate.

 Would we not offer multiple PCA algorithms and let the user choose?

 Trevor

 Trevor Grant
 Data Scientist

 *Fortunate is he, who is able to know the causes of things.  -Virgil*


 On Mon, May 18, 2015 at 4:18 PM, Joseph Bradley jos...@databricks.com
 wrote:

 Hi Tarek,

 Thanks for your interest  for checking the guidelines first!  On 2
 points:

 Algorithm: PCA is of course a critical algorithm.  The main question is
 how your algorithm/implementation differs from the current PCA.  If it's
 different and potentially better, I'd recommend opening up a JIRA for
 explaining  discussing it.

 Java/Scala: We really do require that algorithms be in Scala, for the
 sake of maintainability.  The conversion should be doable if you're willing
 since Scala is a pretty friendly language.  If you create the JIRA, you
 could also ask for help there to see if someone can collaborate with you to
 convert 

Re: Contribute code to MLlib

2015-05-20 Thread Ram Sriharsha
Hi Trevor

I'm attaching the MLLib contribution guideline here:
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines

It speaks to widely known and accepted algorithms but not to whether an 
algorithm has to be better than another in every scenario etc

I think the guideline explains what a good contribution to the core library 
should look like better than I initially attempted to !

Sent from my iPhone

 On May 20, 2015, at 9:31 AM, Ram Sriharsha sriharsha@gmail.com wrote:
 
 Hi Trevor
 
 Good point, I didn't mean that some algorithm has to be clearly better than 
 another in every scenario to be included in MLLib. However, even if someone 
 is willing to be the maintainer of a piece of code, it does not make sense to 
 accept every possible algorithm into the core library.
 
 That said, the specific algorithms should be discussed in the JIRA: as you 
 point out, there is no clear way to decide what algorithm to include and what 
 not to, and usually mature algorithms that serve a wide variety of scenarios 
 are easier to argue about but nothing prevents anyone from opening a ticket 
 to discuss any specific machine learning algorithm.
 
 My suggestion was simply that for purposes of making experimental or newer 
 algorithms available to Spark users, it doesn't necessarily have to be in the 
 core library. Spark packages are good enough in this respect.
 
 Isn't it better for newer algorithms to take this route and prove themselves 
 before we bring them into the core library? Especially given the barrier to 
 using spark packages is very low.
 
 Ram
 
 
 
 On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com 
 wrote:
 Hey Ram,
 
 I'm not speaking to Tarek's package specifically but to the spirit of MLib.  
 There are a number of method/algorithms for PCA, I'm not sure by what 
 criterion the current one is considered 'standard'.  
 
 It is rare to find ANY machine learning algo that is 'clearly better' than 
 any other.  They are all tools, they have their place and time.  I agree 
 that it makes sense to field new algorithms as packages and then integrate 
 into MLib once they are 'proven' (in terms of stability/performance/anyone 
 cares).  That being said, if MLib takes the stance that 'what we have is 
 good enough unless something is clearly better', then it will never grow 
 into a suite with the depth and richness of sklearn. From a practitioner's 
 stand point, its nice to have everything I could ever want ready in an 
 'off-the-shelf' form. 
 
 'A large number of use cases better than existing' shouldn't be a criteria 
 when selecting what to include in MLib.  The important question should be, 
 'Are you willing to take on responsibility for maintaining this because you 
 may be the only person on earth who understands the mechanics AND how to 
 code it?'.   Obviously we don't want any random junk algo included.  But 
 trying to say, 'this way of doing PCA is better than that way in a large 
 class of cases' is like trying to say 'geometry is more important than 
 calculus in large class of cases, maybe its true- but geometry won't help 
 you if you are in a case where you need calculus. 
 
 This all relies on the assumption that MLib is destined to be a rich data 
 science/machine learning package.  It may be that the goal is to make the 
 project as lightweight and parsimonious as possible, if so excuse me for 
 speaking out of turn. 
   
 
 On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com 
 wrote:
 Hi Trevor, Tarek
 
 You make non standard algorithms (PCA or otherwise) available to users of 
 Spark as Spark Packages.
 http://spark-packages.org
 https://databricks.com/blog/2014/12/22/announcing-spark-packages.html
 
 With the availability of spark packages, adding powerful experimental / 
 alternative machine learning algorithms to the pipeline has never been 
 easier. I would suggest that route in scenarios where one machine learning 
 algorithm is not clearly better in the common scenarios than an existing 
 implementation in MLLib.
 
 If your algorithm is for a large class of use cases better than the 
 existing PCA implementation, then we should open a JIRA and discuss the 
 relative strengths/ weaknesses (perhaps with some benchmarks) so we can 
 better understand if it makes sense to switch out the existing PCA 
 implementation and make yours the default.
 
 Ram
 
 On Tue, May 19, 2015 at 6:56 AM, Trevor Grant trevor.d.gr...@gmail.com 
 wrote:
  There are most likely advantages and disadvantages to Tarek's algorithm 
 against the current implementation, and different scenarios where each is 
 more appropriate.
 
 Would we not offer multiple PCA algorithms and let the user choose?
 
 Trevor
 
 Trevor Grant
 Data Scientist
 
 Fortunate is he, who is able to know the causes of things.  -Virgil
 
 
 On Mon, May 18, 2015 at 4:18 PM, Joseph Bradley jos...@databricks.com 
 

IndexedRowMatrix semantics

2015-05-20 Thread Debasish Das
Hi,

For indexedrowmatrix and rowmatrix, both take RDD(vector)is it possible
that it has intermixed dense and sparse vectorbasically I am
considering a gemv flow when indexedrowmatrix has dense flag true, dot flow
otherwise...

Thanks.
Deb


Re: IndexedRowMatrix semantics

2015-05-20 Thread Joseph Bradley
I believe it works with a mix of DenseVector and SparseVector types.
Joseph

On Wed, May 20, 2015 at 10:06 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi,

 For indexedrowmatrix and rowmatrix, both take RDD(vector)is it
 possible that it has intermixed dense and sparse vectorbasically I am
 considering a gemv flow when indexedrowmatrix has dense flag true, dot flow
 otherwise...

 Thanks.
 Deb



Re: Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Tathagata Das
Correcting the ones that are incorrect or incomplete. BUT this is good list
for things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
received the block, and another where the block was replicated) that has
the blocks irrespective of block interval, unless non-local scheduling
kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs is
not impacted.

 To further clarify, the jobs depend on the number of output operations
(print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
output operations.

dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
job per batch

dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
}// TWO Spark jobs per batch

dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata. Only
data checkpointing, needed by only some operations, increase batch
processing time. Read -
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Furthemore, with checkpoint you can recover computation, but you may loose
some data (that was received but not processed before driver failed) for
some sources. Enabling write ahead logs and reliable source + receiver,
allow zero data loss. Read - WAL in
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics


- The frequency of metadata checkpoint cleaning can be controlled
using spark.cleaner.ttl. But, data checkpoint cleaning happens
automatically when the RDDs in the checkpoint are no more required.


 Incorrect. metadata checkpointing or 

Re: Contribute code to MLlib

2015-05-20 Thread Joseph Bradley
Hi Trevor,

I may be repeating what Ram said, but to 2nd it, a few points:

We do want MLlib to become an extensive and rich ML library; as you said,
scikit-learn is a great example.  To make that happen, we of course need to
include important algorithms.  Important is hazy, but roughly means being
useful to a large number of users, improving a large number of use cases
(above what is currently available), and being well-established and tested.

Others and I may not be familiar with Tarek's algorithm (since it is so
new), so it will be important to discuss details on JIRA to establish the
cases in which the algorithm improves over current PCA.  That may require
discussion, community testing, etc.  If we establish that it is a clear
improvement in a large domain, then it could be valuable to have in MLlib
proper.  It's always going to be hard to tell where to draw the line, so
less common algorithms will require more testing before we commit to
including them in MLlib.

I like the Spark package suggestion since it would allow users immediately
start using the code, while the discussion on JIRA happens.  (Plus, if
package users find it useful, they can report that on the JIRA.)

Joseph

On Wed, May 20, 2015 at 10:01 AM, Ram Sriharsha sriharsha@gmail.com
wrote:

 Hi Trevor

 I'm attaching the MLLib contribution guideline here:

 https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines

 It speaks to widely known and accepted algorithms but not to whether an
 algorithm has to be better than another in every scenario etc

 I think the guideline explains what a good contribution to the core
 library should look like better than I initially attempted to !

 Sent from my iPhone

 On May 20, 2015, at 9:31 AM, Ram Sriharsha sriharsha@gmail.com
 wrote:

 Hi Trevor

 Good point, I didn't mean that some algorithm has to be clearly better
 than another in every scenario to be included in MLLib. However, even if
 someone is willing to be the maintainer of a piece of code, it does not
 make sense to accept every possible algorithm into the core library.

 That said, the specific algorithms should be discussed in the JIRA: as you
 point out, there is no clear way to decide what algorithm to include and
 what not to, and usually mature algorithms that serve a wide variety of
 scenarios are easier to argue about but nothing prevents anyone from
 opening a ticket to discuss any specific machine learning algorithm.

 My suggestion was simply that for purposes of making experimental or newer
 algorithms available to Spark users, it doesn't necessarily have to be in
 the core library. Spark packages are good enough in this respect.

 Isn't it better for newer algorithms to take this route and prove
 themselves before we bring them into the core library? Especially given the
 barrier to using spark packages is very low.

 Ram



 On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com
 wrote:

 Hey Ram,

 I'm not speaking to Tarek's package specifically but to the spirit of
 MLib.  There are a number of method/algorithms for PCA, I'm not sure by
 what criterion the current one is considered 'standard'.

 It is rare to find ANY machine learning algo that is 'clearly better'
 than any other.  They are all tools, they have their place and time.  I
 agree that it makes sense to field new algorithms as packages and then
 integrate into MLib once they are 'proven' (in terms of
 stability/performance/anyone cares).  That being said, if MLib takes the
 stance that 'what we have is good enough unless something is *clearly*
 better', then it will never grow into a suite with the depth and richness
 of sklearn. From a practitioner's stand point, its nice to have everything
 I could ever want ready in an 'off-the-shelf' form.

 'A large number of use cases better than existing' shouldn't be a
 criteria when selecting what to include in MLib.  The important question
 should be, 'Are you willing to take on responsibility for maintaining this
 because you may be the only person on earth who understands the mechanics
 AND how to code it?'.   Obviously we don't want any random junk algo
 included.  But trying to say, 'this way of doing PCA is better than that
 way in a large class of cases' is like trying to say 'geometry is more
 important than calculus in large class of cases, maybe its true- but
 geometry won't help you if you are in a case where you need calculus.

 This all relies on the assumption that MLib is destined to be a rich data
 science/machine learning package.  It may be that the goal is to make the
 project as lightweight and parsimonious as possible, if so excuse me for
 speaking out of turn.


 On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com
 wrote:

 Hi Trevor, Tarek

 You make non standard algorithms (PCA or otherwise) available to users
 of Spark as Spark Packages.
 http://spark-packages.org
 

Re: Representing a recursive data type in Spark SQL

2015-05-20 Thread Rakesh Chalasani
Hi Jeremy:

Row is a collect of 'Any'. So, you can be used as a recursive data type. Is
this what you were looking for?

Example:
val x = sc.parallelize(Array.range(0,10)).map(x = Row(Row(x),
Row(x.toString)))

Rakesh



On Wed, May 20, 2015 at 7:23 PM Jeremy Lucas jeremyalu...@gmail.com wrote:

 Spark SQL has proven to be quite useful in applying a partial schema to
 large JSON logs and being able to write plain SQL to perform a wide variety
 of operations over this data. However, one small thing that keeps coming
 back to haunt me is the lack of support for recursive data types, whereby a
 member of a complex/struct value can be of the same type as the
 complex/struct value itself.

 I am hoping someone may be able to point me in the right direction of
 where to start to build out such capabilities, as I'd be happy to
 contribute, but am very new to this particular component of the Spark
 project.



Re: Representing a recursive data type in Spark SQL

2015-05-20 Thread Jeremy Lucas
Hey Rakesh,

To clarify, what I was referring to is when doing something like this:

sqlContext.applySchema(rdd, mySchema)

mySchema must be a well-defined StructType, which presently does not allow
for a recursive type.


On Wed, May 20, 2015 at 5:39 PM Rakesh Chalasani vnit.rak...@gmail.com
wrote:

 Hi Jeremy:

 Row is a collect of 'Any'. So, you can be used as a recursive data type.
 Is this what you were looking for?

 Example:
 val x = sc.parallelize(Array.range(0,10)).map(x = Row(Row(x),
 Row(x.toString)))

 Rakesh



 On Wed, May 20, 2015 at 7:23 PM Jeremy Lucas jeremyalu...@gmail.com
 wrote:

 Spark SQL has proven to be quite useful in applying a partial schema to
 large JSON logs and being able to write plain SQL to perform a wide variety
 of operations over this data. However, one small thing that keeps coming
 back to haunt me is the lack of support for recursive data types, whereby a
 member of a complex/struct value can be of the same type as the
 complex/struct value itself.

 I am hoping someone may be able to point me in the right direction of
 where to start to build out such capabilities, as I'd be happy to
 contribute, but am very new to this particular component of the Spark
 project.




Representing a recursive data type in Spark SQL

2015-05-20 Thread Jeremy Lucas
Spark SQL has proven to be quite useful in applying a partial schema to
large JSON logs and being able to write plain SQL to perform a wide variety
of operations over this data. However, one small thing that keeps coming
back to haunt me is the lack of support for recursive data types, whereby a
member of a complex/struct value can be of the same type as the
complex/struct value itself.

I am hoping someone may be able to point me in the right direction of where
to start to build out such capabilities, as I'd be happy to contribute, but
am very new to this particular component of the Spark project.


Low throughput and effect of GC in SparkSql GROUP BY

2015-05-20 Thread Pramod Biligiri
Hi,
Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a
data point regarding the performance of Group By, indicating there's
excessive GC and it's impacting the throughput. I want to know if the new
memory manager for aggregations (https://github.com/apache/spark/pull/5725/)
is going to address this kind of issue.

I only have a small amount of data on each node (~360MB) with a large heap
size (18 Gig). I still see 2-3 minor collections happening whenever I do a
Select Sum() with a group by(). I have tried with different sizes for Young
Generation without much effect, though not with different GC algorithms
(Hm..I ought to try reducing the rdd storage fraction perhaps).

I have made a chart of my results [1] by adding timing code to
Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab
benchmark, running over 10 million records. The chart is from one of the 4
worker nodes in the cluster.

I am trying to square this with a claim on the Project Tungsten blog post
[2]: When profiling Spark user applications, we’ve found that a large
fraction of the CPU time is spent waiting for data to be fetched from main
memory. 

Am I correct in assuming that SparkSql is yet to reach that level of
efficiency, at least in aggregation operations?

Thanks.

[1] -
https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
[2]
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

Pramod


Re: Low throughput and effect of GC in SparkSql GROUP BY

2015-05-20 Thread Reynold Xin
Does this turn codegen on? I think the performance is fairly different when
codegen is turned on.

For 1.5, we are investigating having codegen on by default, so users get
much better performance out of the box.


On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com
wrote:

 Hi,
 Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have
 a data point regarding the performance of Group By, indicating there's
 excessive GC and it's impacting the throughput. I want to know if the new
 memory manager for aggregations (
 https://github.com/apache/spark/pull/5725/) is going to address this kind
 of issue.

 I only have a small amount of data on each node (~360MB) with a large heap
 size (18 Gig). I still see 2-3 minor collections happening whenever I do a
 Select Sum() with a group by(). I have tried with different sizes for Young
 Generation without much effect, though not with different GC algorithms
 (Hm..I ought to try reducing the rdd storage fraction perhaps).

 I have made a chart of my results [1] by adding timing code to
 Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab
 benchmark, running over 10 million records. The chart is from one of the 4
 worker nodes in the cluster.

 I am trying to square this with a claim on the Project Tungsten blog post
 [2]: When profiling Spark user applications, we’ve found that a large
 fraction of the CPU time is spent waiting for data to be fetched from main
 memory. 

 Am I correct in assuming that SparkSql is yet to reach that level of
 efficiency, at least in aggregation operations?

 Thanks.

 [1] -
 https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174
 [2]
 https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

 Pramod



Re: Performance Memory Issues When Creating Many Columns in GROUP BY (spark-sql)

2015-05-20 Thread Reynold Xin
It is a lot of columns, but I'm not sure if that's why it is running out of
memory. In Spark SQL, we are not yet doing external aggregation when the
number of keys is large in the aggregation hashmap. We will fix this and
have external aggregation in 1.5.


On Tue, May 19, 2015 at 2:43 AM, daniel.mescheder 
daniel.mesche...@realimpactanalytics.com wrote:

 Dear List,

 We have run into serious problems trying to run a larger than average
 number of aggregations in a GROUP BY query. Symptoms of this problem are
 OutOfMemory exceptions and unreasonably long processing times due to
 GC. The problem occurs when the following two conditions are met:

  - The number of groups is relatively large (growing with the size of the
 dataset)
  - The number of columns is relatively large

 To reproduce, paste the following gist into your spark-shell (I'm running
 1.3.1): https://gist.github.com/DanielMe/9467bb0d9ad3aa639429

 This example is relatively small in size:

  - The size of the input is 10ˆ6 * 64bit = 8MB
  - The size of the output should be around 3 * 10ˆ8 * 64bit = 2.4GB
  - The aggregations themselves are just count(1) and hence not so
 difficult to compute

 I am running this on a cluster with three 61GB worker machines and an
 equally equipped master with the following spark-defaults.conf:

 spark.executor.memory=55g
 spark.driver.memory=55g

 The result: The workers will choke with java.lang.OutOfMemoryError: GC
 overhead limit exceeded. In fact, if you play with the num_columns
 parameter you should observe an unreasonable amount of time spent on GC
 even for lower values. If you run this on a desktop machine, low values of
 num_columns should already lead to OOM crashes.

 My questions are:

  - What causes this behaviour?
  - Can/should catalyst be able to automatically optimize queries of this
 kind to run in reasonable time or at least not crash?
  - What are  possible workarounds to achieve the desired effect? (Even if
 that means not using DataFrames but going down to the raw RDD level)

 Our preliminary analysis of the situation concluded that what is blowing
 up is in fact the hashTable in Aggregate::doExecute which will try to store
 the cross product of groups and columns on each partition. In fact, we
 managed to mitigate the issue a bit by

  - reducing the size of the partitions (which will make these hash tables
 smaller)
  - pre-partitioning the data using a HashPartitioner on the key (which
 will reduce the number of different groups per partition)

 The latter actually seems to be a sensible thing to do whenever
 num_columns*num_groups  num_rows because in this setting the amount of
 data we have to shuffle around after the first aggregation step is actually
 larger than the amount of data we had initially. Could this be something
 that catalyst should take into account when creating a physical plan?

 Thanks in advance.

 Kind regards,


 Daniel


 --
 View this message in context: Performance  Memory Issues When Creating
 Many Columns in GROUP BY (spark-sql)
 http://apache-spark-developers-list.1001551.n3.nabble.com/Performance-Memory-Issues-When-Creating-Many-Columns-in-GROUP-BY-spark-sql-tp12313.html
 Sent from the Apache Spark Developers List mailing list archive
 http://apache-spark-developers-list.1001551.n3.nabble.com/ at
 Nabble.com.



Re: [VOTE] Release Apache Spark 1.4.0 (RC1)

2015-05-20 Thread Imran Rashid
-1

discovered I accidentally removed master  worker json endpoints, will
restore
https://issues.apache.org/jira/browse/SPARK-7760

On Tue, May 19, 2015 at 11:10 AM, Patrick Wendell pwend...@gmail.com
wrote:

 Please vote on releasing the following candidate as Apache Spark version
 1.4.0!

 The tag to be voted on is v1.4.0-rc1 (commit 777a081):

 https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=777a08166f1fb144146ba32581d4632c3466541e

 The release files, including signatures, digests, etc. can be found at:
 http://people.apache.org/~pwendell/spark-1.4.0-rc1/

 Release artifacts are signed with the following key:
 https://people.apache.org/keys/committer/pwendell.asc

 The staging repository for this release can be found at:
 https://repository.apache.org/content/repositories/orgapachespark-1092/

 The documentation corresponding to this release can be found at:
 http://people.apache.org/~pwendell/spark-1.4.0-rc1-docs/

 Please vote on releasing this package as Apache Spark 1.4.0!

 The vote is open until Friday, May 22, at 17:03 UTC and passes
 if a majority of at least 3 +1 PMC votes are cast.

 [ ] +1 Release this package as Apache Spark 1.4.0
 [ ] -1 Do not release this package because ...

 To learn more about Apache Spark, please see
 http://spark.apache.org/

 == How can I help test this release? ==
 If you are a Spark user, you can help us test this release by
 taking a Spark 1.3 workload and running on this release candidate,
 then reporting any regressions.

 == What justifies a -1 vote for this release? ==
 This vote is happening towards the end of the 1.4 QA period,
 so -1 votes should only occur for significant regressions from 1.3.1.
 Bugs already present in 1.3.X, minor regressions, or bugs related
 to new features will not block this release.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org