Spark Streaming - Design considerations/Knobs
Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. - The frequency of metadata checkpoint cleaning can be controlled using spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when the RDDs in the checkpoint are no more required. Thanks, Hemant
userClassPathFirst and loader constraint violation
(Marcelo you might have some insight on this one) Warning: this may just be because I'm doing something non-standard -- trying embed Spark in a Java app and feed it all the classpath it needs manually. But this was surprising enough I wanted to ask. I have an app that includes among other things SLF4J. I have set spark.{driver,executor}.userClassPathFirst to true. If I run it and let it start a Spark job, it quickly fails with: 2015-05-20 04:35:01,747 WARN TaskSetManager:71 Lost task 0.0 in stage 0.0 (TID 0, x.cloudera.com): java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/spark/util/ChildFirstURLClassLoader) previously initiated loading for a different type with name org/slf4j/Logger at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at org.apache.spark.util.ChildFirstURLClassLoader.liftedTree1$1(MutableURLClassLoader.scala:74) at org.apache.spark.util.ChildFirstURLClassLoader.loadClass(MutableURLClassLoader.scala:73) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ... I can see that this class was loaded from my app JAR: [Loaded org.slf4j.Logger from file:/home/sowen/oryx-batch-2.0.0-SNAPSHOT.jar] I'm assuming it's also loaded in some Spark classloader. Tracing the code, I don't see that it ever gets to consulting any other classloader; this happens during its own child-first attempt to load the class. This didn't happen in 1.2, FWIW, when the implementation was different, but that's only to say it was different, not correct. Anyone have thoughts on what this indicates? something to be expected or surprising? I think that disabling userClassPathFirst gets rid of this of course, although that may cause other issues later. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Regarding Connecting spark to Mesos documentation
Hi List, In the documentation of Connecting Spark to Mesos http://spark.apache.org/docs/latest/running-on-mesos.html#connecting-spark-to-mesos, is it possible to modify and write in detail the step Create a binary package using make-distribution.sh --tgz ? When we use custom compiled version of Spark, mostly we specify a hadoop version (which is not the default one). In this case, make-distribution.sh should be supplied the same maven options we used for building spark. This is not specified in the documentation. Please correct me , if I am wrong. Regards, Meethu Mathew
Re: [Catalyst] RFC: Using PartialFunction literals instead of objects
Thanks for the prompt feedback; I have further expanded on your suggestions on this JIRA https://issues.apache.org/jira/browse/SPARK-7754 On Tue, May 19, 2015 at 8:35 PM, Michael Armbrust mich...@databricks.com wrote: Overall this seems like a reasonable proposal to me. Here are a few thoughts: - There is some debugging utility to the ruleName, so we would probably want to at least make that an argument to the rule function. - We also have had rules that operate on SparkPlan, though since there is only one ATM maybe we don't need sugar there. - I would not call the sugar for creating Strategies rule/seqrule, as I think the one-to-one vs one-to-many distinction is useful. - I'm generally pro-refactoring to make the code nicer, especially when its not official public API, but I do think its important to maintain source compatibility (which I think you are) when possible as there are other projects using catalyst. - Finally, we'll have to balance this with other code changes / conflicts. You should probably open a JIRA and we can continue the discussion there. On Tue, May 19, 2015 at 4:16 AM, Edoardo Vacchi uncommonnonse...@gmail.com wrote: Hi everybody, At the moment, Catalyst rules are defined using two different types of rules: `Rule[LogicalPlan]` and `Strategy` (which in turn maps to `GenericStrategy[SparkPlan]`). I propose to introduce utility methods to a) reduce the boilerplate to define rewrite rules b) turning them back into what they essentially represent: function types. These changes would be backwards compatible, and would greatly help in understanding what the code does. Personally, I feel like the current use of objects is redundant and possibly confusing. ## `Rule[LogicalPlan]` The analyzer and optimizer use `Rule[LogicalPlan]`, which, besides defining a default `val ruleName` only defines the method `apply(plan: TreeType): TreeType`. Because the body of such method is always supposed to read `plan match pf`, with `pf` being some `PartialFunction[LogicalPlan, LogicalPlan]`, we can conclude that `Rule[LogicalPlan]` might be substituted by a PartialFunction. I propose the following: a) Introduce the utility method def rule(pf: PartialFunction[LogicalPlan, LogicalPlan]): Rule[LogicalPlan] = new Rule[LogicalPlan] { def apply (plan: LogicalPlan): LogicalPlan = plan transform pf } b) progressively replace the boilerplate-y object definitions; e.g. object MyRewriteRule extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ... = ... } with // define a Rule[LogicalPlan] val MyRewriteRule = rule { case ... = ... } it might also be possible to make rule method `implicit`, thereby further reducing MyRewriteRule to: // define a PartialFunction[LogicalPlan, LogicalPlan] // the implicit would convert it into a Rule[LogicalPlan] at the use sites val MyRewriteRule = { case ... = ... } ## Strategies A similar solution could be applied to shorten the code for Strategies, which are total functions only because they are all supposed to manage the default case, possibly returning `Nil`. In this case we might introduce the following utility methods: /** * Generate a Strategy from a PartialFunction[LogicalPlan, SparkPlan]. * The partial function must therefore return *one single* SparkPlan for each case. * The method will automatically wrap them in a [[Seq]]. * Unhandled cases will automatically return Seq.empty */ protected def rule(pf: PartialFunction[LogicalPlan, SparkPlan]): Strategy = new Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = if (pf.isDefinedAt(plan)) Seq(pf.apply(plan)) else Seq.empty } /** * Generate a Strategy from a PartialFunction[ LogicalPlan, Seq[SparkPlan] ]. * The partial function must therefore return a Seq[SparkPlan] for each case. * Unhandled cases will automatically return Seq.empty */ protected def seqrule(pf: PartialFunction[LogicalPlan, Seq[SparkPlan]]): Strategy = new Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = if (pf.isDefinedAt(plan)) pf.apply(plan) else Seq.empty[SparkPlan] } Thanks in advance e.v. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [VOTE] Release Apache Spark 1.4.0 (RC1)
Signature, hashes, LICENSE/NOTICE, source tarball looks OK. I built for Hadoop 2.6 (-Pyarn -Phive -Phadoop-2.6) on Ubuntu from source and tests pass. The release looks OK except that I'd like to resolve the Blockers before giving a +1. I'm seeing some test failures, and wanted to cross-check with others. They're all in Hive. Some I think are due to Java 8 differences and are just test issues; they expect an exact output from a query plan and some HashSet ordering differences make it trivially different. If so, I've seen this in the past and we could ignore it for now, but would be good to get a second set of eyes. The trace is big so it's at the end. When rerunning with Java 7 I get a different error due to Hive version support: - success sanity check *** FAILED *** java.lang.RuntimeException: [download failed: org.jboss.netty#netty;3.2.2.Final!netty.jar(bundle), download failed: commons-net#commons-net;3.1!commons-net.jar] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:972) at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anonfun$3.apply(IsolatedClientLoader.scala:62) ... Hive / possible Java 8 test issue: - windowing.q -- 20. testSTATs *** FAILED *** Results do not match for windowing.q -- 20. testSTATs: == Parsed Logical Plan == 'WithWindowDefinition Map(w1 - WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) 'Project ['p_mfgr,'p_name,'p_size,UnresolvedWindowExpression WindowSpecReference(w1) UnresolvedWindowFunction stddev UnresolvedAttribute [p_retailprice] AS sdev#159481,UnresolvedWindowExpression WindowSpecReference(w1) UnresolvedWindowFunction stddev_pop UnresolvedAttribute [p_retailprice] AS sdev_pop#159482,UnresolvedWindowExpression WindowSpecReference(w1) UnresolvedWindowFunction collect_set UnresolvedAttribute [p_size] AS uniq_size#159483,UnresolvedWindowExpression WindowSpecReference(w1) UnresolvedWindowFunction variance UnresolvedAttribute [p_retailprice] AS var#159484,UnresolvedWindowExpression WindowSpecReference(w1) UnresolvedWindowFunction corr UnresolvedAttribute [p_size] UnresolvedAttribute [p_retailprice] AS cor#159485,UnresolvedWindowExpression WindowSpecReference(w1) UnresolvedWindowFunction covar_pop UnresolvedAttribute [p_size] UnresolvedAttribute [p_retailprice] AS covarp#159486] 'UnresolvedRelation [part], None == Analyzed Logical Plan == p_mfgr: string, p_name: string, p_size: int, sdev: double, sdev_pop: double, uniq_size: arrayint, var: double, cor: double, covarp: double Project [p_mfgr#159489,p_name#159488,p_size#159492,sdev#159481,sdev_pop#159482,uniq_size#159483,var#159484,cor#159485,covarp#159486] Window [p_mfgr#159489,p_name#159488,p_size#159492,p_retailprice#159494], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS sdev#159481,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS sdev_pop#159482,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet(p_size#159492) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS uniq_size#159483,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance(p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS var#159484,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCorrelation(p_size#159492,p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS cor#159485,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCovariance(p_size#159492,p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS covarp#159486], WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING Project [p_mfgr#159489,p_name#159488,p_size#159492,p_retailprice#159494] MetastoreRelation default, part, None == Optimized Logical Plan == Project [p_mfgr#159489,p_name#159488,p_size#159492,sdev#159481,sdev_pop#159482,uniq_size#159483,var#159484,cor#159485,covarp#159486] Window [p_mfgr#159489,p_name#159488,p_size#159492,p_retailprice#159494], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS sdev#159481,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd(p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS sdev_pop#159482,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet(p_size#159492) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING AS uniq_size#159483,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance(p_retailprice#159494) WindowSpecDefinition ROWS BETWEEN 2
Re: Contribute code to MLlib
Hey Ram, I'm not speaking to Tarek's package specifically but to the spirit of MLib. There are a number of method/algorithms for PCA, I'm not sure by what criterion the current one is considered 'standard'. It is rare to find ANY machine learning algo that is 'clearly better' than any other. They are all tools, they have their place and time. I agree that it makes sense to field new algorithms as packages and then integrate into MLib once they are 'proven' (in terms of stability/performance/anyone cares). That being said, if MLib takes the stance that 'what we have is good enough unless something is *clearly* better', then it will never grow into a suite with the depth and richness of sklearn. From a practitioner's stand point, its nice to have everything I could ever want ready in an 'off-the-shelf' form. 'A large number of use cases better than existing' shouldn't be a criteria when selecting what to include in MLib. The important question should be, 'Are you willing to take on responsibility for maintaining this because you may be the only person on earth who understands the mechanics AND how to code it?'. Obviously we don't want any random junk algo included. But trying to say, 'this way of doing PCA is better than that way in a large class of cases' is like trying to say 'geometry is more important than calculus in large class of cases, maybe its true- but geometry won't help you if you are in a case where you need calculus. This all relies on the assumption that MLib is destined to be a rich data science/machine learning package. It may be that the goal is to make the project as lightweight and parsimonious as possible, if so excuse me for speaking out of turn. On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor, Tarek You make non standard algorithms (PCA or otherwise) available to users of Spark as Spark Packages. http://spark-packages.org https://databricks.com/blog/2014/12/22/announcing-spark-packages.html With the availability of spark packages, adding powerful experimental / alternative machine learning algorithms to the pipeline has never been easier. I would suggest that route in scenarios where one machine learning algorithm is not clearly better in the common scenarios than an existing implementation in MLLib. If your algorithm is for a large class of use cases better than the existing PCA implementation, then we should open a JIRA and discuss the relative strengths/ weaknesses (perhaps with some benchmarks) so we can better understand if it makes sense to switch out the existing PCA implementation and make yours the default. Ram On Tue, May 19, 2015 at 6:56 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: There are most likely advantages and disadvantages to Tarek's algorithm against the current implementation, and different scenarios where each is more appropriate. Would we not offer multiple PCA algorithms and let the user choose? Trevor Trevor Grant Data Scientist *Fortunate is he, who is able to know the causes of things. -Virgil* On Mon, May 18, 2015 at 4:18 PM, Joseph Bradley jos...@databricks.com wrote: Hi Tarek, Thanks for your interest for checking the guidelines first! On 2 points: Algorithm: PCA is of course a critical algorithm. The main question is how your algorithm/implementation differs from the current PCA. If it's different and potentially better, I'd recommend opening up a JIRA for explaining discussing it. Java/Scala: We really do require that algorithms be in Scala, for the sake of maintainability. The conversion should be doable if you're willing since Scala is a pretty friendly language. If you create the JIRA, you could also ask for help there to see if someone can collaborate with you to convert the code to Scala. Thanks! Joseph On Mon, May 18, 2015 at 3:13 AM, Tarek Elgamal tarek.elga...@gmail.com wrote: Hi, I would like to contribute an algorithm to the MLlib project. I have implemented a scalable PCA algorithm on spark. It is scalable for both tall and fat matrices and the paper around it is accepted for publication in SIGMOD 2015 conference. I looked at the guidelines in the following link: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines I believe that most of the guidelines applies in my case, however, the code is written in java and it was not clear in the guidelines whether MLLib project accepts java code or not. My algorithm can be found under this repository: https://github.com/Qatar-Computing-Research-Institute/sPCA Any help on how to make it suitable for MLlib project will be greatly appreciated. Best Regards, Tarek Elgamal
Re: userClassPathFirst and loader constraint violation
Hmm... this seems to be particular to logging (KafkaRDD.scala:89 in my tree is a log statement). I'd expect KafkaRDD to be loaded from the system class loader - or are you repackaging it in your app? I'd have to investigate more to come with an accurate explanation here... but it seems that the initialization of the logging system, which happens after SparkSubmit runs and sets the context class loader to be an instance of ChildFirstURLClassLoader, is causing things to blow up. I'll see if I can spend some cycles coming up with a proper explanation (and hopefully a fix or workaround). For now, you could probably avoid this by not repackaging the logging dependencies in your app. On Wed, May 20, 2015 at 5:03 AM, Sean Owen so...@cloudera.com wrote: (Marcelo you might have some insight on this one) Warning: this may just be because I'm doing something non-standard -- trying embed Spark in a Java app and feed it all the classpath it needs manually. But this was surprising enough I wanted to ask. I have an app that includes among other things SLF4J. I have set spark.{driver,executor}.userClassPathFirst to true. If I run it and let it start a Spark job, it quickly fails with: 2015-05-20 04:35:01,747 WARN TaskSetManager:71 Lost task 0.0 in stage 0.0 (TID 0, x.cloudera.com): java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/spark/util/ChildFirstURLClassLoader) previously initiated loading for a different type with name org/slf4j/Logger at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at org.apache.spark.util.ChildFirstURLClassLoader.liftedTree1$1(MutableURLClassLoader.scala:74) at org.apache.spark.util.ChildFirstURLClassLoader.loadClass(MutableURLClassLoader.scala:73) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ... I can see that this class was loaded from my app JAR: [Loaded org.slf4j.Logger from file:/home/sowen/oryx-batch-2.0.0-SNAPSHOT.jar] I'm assuming it's also loaded in some Spark classloader. Tracing the code, I don't see that it ever gets to consulting any other classloader; this happens during its own child-first attempt to load the class. This didn't happen in 1.2, FWIW, when the implementation was different, but that's only to say it was different, not correct. Anyone have thoughts on what this indicates? something to be expected or surprising? I think that disabling userClassPathFirst gets rid of this of course, although that may cause other issues later. -- Marcelo
Re: Contribute code to MLlib
Hi Trevor Good point, I didn't mean that some algorithm has to be clearly better than another in every scenario to be included in MLLib. However, even if someone is willing to be the maintainer of a piece of code, it does not make sense to accept every possible algorithm into the core library. That said, the specific algorithms should be discussed in the JIRA: as you point out, there is no clear way to decide what algorithm to include and what not to, and usually mature algorithms that serve a wide variety of scenarios are easier to argue about but nothing prevents anyone from opening a ticket to discuss any specific machine learning algorithm. My suggestion was simply that for purposes of making experimental or newer algorithms available to Spark users, it doesn't necessarily have to be in the core library. Spark packages are good enough in this respect. Isn't it better for newer algorithms to take this route and prove themselves before we bring them into the core library? Especially given the barrier to using spark packages is very low. Ram On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: Hey Ram, I'm not speaking to Tarek's package specifically but to the spirit of MLib. There are a number of method/algorithms for PCA, I'm not sure by what criterion the current one is considered 'standard'. It is rare to find ANY machine learning algo that is 'clearly better' than any other. They are all tools, they have their place and time. I agree that it makes sense to field new algorithms as packages and then integrate into MLib once they are 'proven' (in terms of stability/performance/anyone cares). That being said, if MLib takes the stance that 'what we have is good enough unless something is *clearly* better', then it will never grow into a suite with the depth and richness of sklearn. From a practitioner's stand point, its nice to have everything I could ever want ready in an 'off-the-shelf' form. 'A large number of use cases better than existing' shouldn't be a criteria when selecting what to include in MLib. The important question should be, 'Are you willing to take on responsibility for maintaining this because you may be the only person on earth who understands the mechanics AND how to code it?'. Obviously we don't want any random junk algo included. But trying to say, 'this way of doing PCA is better than that way in a large class of cases' is like trying to say 'geometry is more important than calculus in large class of cases, maybe its true- but geometry won't help you if you are in a case where you need calculus. This all relies on the assumption that MLib is destined to be a rich data science/machine learning package. It may be that the goal is to make the project as lightweight and parsimonious as possible, if so excuse me for speaking out of turn. On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor, Tarek You make non standard algorithms (PCA or otherwise) available to users of Spark as Spark Packages. http://spark-packages.org https://databricks.com/blog/2014/12/22/announcing-spark-packages.html With the availability of spark packages, adding powerful experimental / alternative machine learning algorithms to the pipeline has never been easier. I would suggest that route in scenarios where one machine learning algorithm is not clearly better in the common scenarios than an existing implementation in MLLib. If your algorithm is for a large class of use cases better than the existing PCA implementation, then we should open a JIRA and discuss the relative strengths/ weaknesses (perhaps with some benchmarks) so we can better understand if it makes sense to switch out the existing PCA implementation and make yours the default. Ram On Tue, May 19, 2015 at 6:56 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: There are most likely advantages and disadvantages to Tarek's algorithm against the current implementation, and different scenarios where each is more appropriate. Would we not offer multiple PCA algorithms and let the user choose? Trevor Trevor Grant Data Scientist *Fortunate is he, who is able to know the causes of things. -Virgil* On Mon, May 18, 2015 at 4:18 PM, Joseph Bradley jos...@databricks.com wrote: Hi Tarek, Thanks for your interest for checking the guidelines first! On 2 points: Algorithm: PCA is of course a critical algorithm. The main question is how your algorithm/implementation differs from the current PCA. If it's different and potentially better, I'd recommend opening up a JIRA for explaining discussing it. Java/Scala: We really do require that algorithms be in Scala, for the sake of maintainability. The conversion should be doable if you're willing since Scala is a pretty friendly language. If you create the JIRA, you could also ask for help there to see if someone can collaborate with you to convert
Re: Contribute code to MLlib
Hi Trevor I'm attaching the MLLib contribution guideline here: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines It speaks to widely known and accepted algorithms but not to whether an algorithm has to be better than another in every scenario etc I think the guideline explains what a good contribution to the core library should look like better than I initially attempted to ! Sent from my iPhone On May 20, 2015, at 9:31 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor Good point, I didn't mean that some algorithm has to be clearly better than another in every scenario to be included in MLLib. However, even if someone is willing to be the maintainer of a piece of code, it does not make sense to accept every possible algorithm into the core library. That said, the specific algorithms should be discussed in the JIRA: as you point out, there is no clear way to decide what algorithm to include and what not to, and usually mature algorithms that serve a wide variety of scenarios are easier to argue about but nothing prevents anyone from opening a ticket to discuss any specific machine learning algorithm. My suggestion was simply that for purposes of making experimental or newer algorithms available to Spark users, it doesn't necessarily have to be in the core library. Spark packages are good enough in this respect. Isn't it better for newer algorithms to take this route and prove themselves before we bring them into the core library? Especially given the barrier to using spark packages is very low. Ram On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: Hey Ram, I'm not speaking to Tarek's package specifically but to the spirit of MLib. There are a number of method/algorithms for PCA, I'm not sure by what criterion the current one is considered 'standard'. It is rare to find ANY machine learning algo that is 'clearly better' than any other. They are all tools, they have their place and time. I agree that it makes sense to field new algorithms as packages and then integrate into MLib once they are 'proven' (in terms of stability/performance/anyone cares). That being said, if MLib takes the stance that 'what we have is good enough unless something is clearly better', then it will never grow into a suite with the depth and richness of sklearn. From a practitioner's stand point, its nice to have everything I could ever want ready in an 'off-the-shelf' form. 'A large number of use cases better than existing' shouldn't be a criteria when selecting what to include in MLib. The important question should be, 'Are you willing to take on responsibility for maintaining this because you may be the only person on earth who understands the mechanics AND how to code it?'. Obviously we don't want any random junk algo included. But trying to say, 'this way of doing PCA is better than that way in a large class of cases' is like trying to say 'geometry is more important than calculus in large class of cases, maybe its true- but geometry won't help you if you are in a case where you need calculus. This all relies on the assumption that MLib is destined to be a rich data science/machine learning package. It may be that the goal is to make the project as lightweight and parsimonious as possible, if so excuse me for speaking out of turn. On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor, Tarek You make non standard algorithms (PCA or otherwise) available to users of Spark as Spark Packages. http://spark-packages.org https://databricks.com/blog/2014/12/22/announcing-spark-packages.html With the availability of spark packages, adding powerful experimental / alternative machine learning algorithms to the pipeline has never been easier. I would suggest that route in scenarios where one machine learning algorithm is not clearly better in the common scenarios than an existing implementation in MLLib. If your algorithm is for a large class of use cases better than the existing PCA implementation, then we should open a JIRA and discuss the relative strengths/ weaknesses (perhaps with some benchmarks) so we can better understand if it makes sense to switch out the existing PCA implementation and make yours the default. Ram On Tue, May 19, 2015 at 6:56 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: There are most likely advantages and disadvantages to Tarek's algorithm against the current implementation, and different scenarios where each is more appropriate. Would we not offer multiple PCA algorithms and let the user choose? Trevor Trevor Grant Data Scientist Fortunate is he, who is able to know the causes of things. -Virgil On Mon, May 18, 2015 at 4:18 PM, Joseph Bradley jos...@databricks.com
IndexedRowMatrix semantics
Hi, For indexedrowmatrix and rowmatrix, both take RDD(vector)is it possible that it has intermixed dense and sparse vectorbasically I am considering a gemv flow when indexedrowmatrix has dense flag true, dot flow otherwise... Thanks. Deb
Re: IndexedRowMatrix semantics
I believe it works with a mix of DenseVector and SparseVector types. Joseph On Wed, May 20, 2015 at 10:06 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, For indexedrowmatrix and rowmatrix, both take RDD(vector)is it possible that it has intermixed dense and sparse vectorbasically I am considering a gemv flow when indexedrowmatrix has dense flag true, dot flow otherwise... Thanks. Deb
Re: Spark Streaming - Design considerations/Knobs
Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch processing time. Read - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Furthemore, with checkpoint you can recover computation, but you may loose some data (that was received but not processed before driver failed) for some sources. Enabling write ahead logs and reliable source + receiver, allow zero data loss. Read - WAL in http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics - The frequency of metadata checkpoint cleaning can be controlled using spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when the RDDs in the checkpoint are no more required. Incorrect. metadata checkpointing or
Re: Contribute code to MLlib
Hi Trevor, I may be repeating what Ram said, but to 2nd it, a few points: We do want MLlib to become an extensive and rich ML library; as you said, scikit-learn is a great example. To make that happen, we of course need to include important algorithms. Important is hazy, but roughly means being useful to a large number of users, improving a large number of use cases (above what is currently available), and being well-established and tested. Others and I may not be familiar with Tarek's algorithm (since it is so new), so it will be important to discuss details on JIRA to establish the cases in which the algorithm improves over current PCA. That may require discussion, community testing, etc. If we establish that it is a clear improvement in a large domain, then it could be valuable to have in MLlib proper. It's always going to be hard to tell where to draw the line, so less common algorithms will require more testing before we commit to including them in MLlib. I like the Spark package suggestion since it would allow users immediately start using the code, while the discussion on JIRA happens. (Plus, if package users find it useful, they can report that on the JIRA.) Joseph On Wed, May 20, 2015 at 10:01 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor I'm attaching the MLLib contribution guideline here: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-MLlib-specificContributionGuidelines It speaks to widely known and accepted algorithms but not to whether an algorithm has to be better than another in every scenario etc I think the guideline explains what a good contribution to the core library should look like better than I initially attempted to ! Sent from my iPhone On May 20, 2015, at 9:31 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor Good point, I didn't mean that some algorithm has to be clearly better than another in every scenario to be included in MLLib. However, even if someone is willing to be the maintainer of a piece of code, it does not make sense to accept every possible algorithm into the core library. That said, the specific algorithms should be discussed in the JIRA: as you point out, there is no clear way to decide what algorithm to include and what not to, and usually mature algorithms that serve a wide variety of scenarios are easier to argue about but nothing prevents anyone from opening a ticket to discuss any specific machine learning algorithm. My suggestion was simply that for purposes of making experimental or newer algorithms available to Spark users, it doesn't necessarily have to be in the core library. Spark packages are good enough in this respect. Isn't it better for newer algorithms to take this route and prove themselves before we bring them into the core library? Especially given the barrier to using spark packages is very low. Ram On Wed, May 20, 2015 at 9:05 AM, Trevor Grant trevor.d.gr...@gmail.com wrote: Hey Ram, I'm not speaking to Tarek's package specifically but to the spirit of MLib. There are a number of method/algorithms for PCA, I'm not sure by what criterion the current one is considered 'standard'. It is rare to find ANY machine learning algo that is 'clearly better' than any other. They are all tools, they have their place and time. I agree that it makes sense to field new algorithms as packages and then integrate into MLib once they are 'proven' (in terms of stability/performance/anyone cares). That being said, if MLib takes the stance that 'what we have is good enough unless something is *clearly* better', then it will never grow into a suite with the depth and richness of sklearn. From a practitioner's stand point, its nice to have everything I could ever want ready in an 'off-the-shelf' form. 'A large number of use cases better than existing' shouldn't be a criteria when selecting what to include in MLib. The important question should be, 'Are you willing to take on responsibility for maintaining this because you may be the only person on earth who understands the mechanics AND how to code it?'. Obviously we don't want any random junk algo included. But trying to say, 'this way of doing PCA is better than that way in a large class of cases' is like trying to say 'geometry is more important than calculus in large class of cases, maybe its true- but geometry won't help you if you are in a case where you need calculus. This all relies on the assumption that MLib is destined to be a rich data science/machine learning package. It may be that the goal is to make the project as lightweight and parsimonious as possible, if so excuse me for speaking out of turn. On Tue, May 19, 2015 at 10:41 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi Trevor, Tarek You make non standard algorithms (PCA or otherwise) available to users of Spark as Spark Packages. http://spark-packages.org
Re: Representing a recursive data type in Spark SQL
Hi Jeremy: Row is a collect of 'Any'. So, you can be used as a recursive data type. Is this what you were looking for? Example: val x = sc.parallelize(Array.range(0,10)).map(x = Row(Row(x), Row(x.toString))) Rakesh On Wed, May 20, 2015 at 7:23 PM Jeremy Lucas jeremyalu...@gmail.com wrote: Spark SQL has proven to be quite useful in applying a partial schema to large JSON logs and being able to write plain SQL to perform a wide variety of operations over this data. However, one small thing that keeps coming back to haunt me is the lack of support for recursive data types, whereby a member of a complex/struct value can be of the same type as the complex/struct value itself. I am hoping someone may be able to point me in the right direction of where to start to build out such capabilities, as I'd be happy to contribute, but am very new to this particular component of the Spark project.
Re: Representing a recursive data type in Spark SQL
Hey Rakesh, To clarify, what I was referring to is when doing something like this: sqlContext.applySchema(rdd, mySchema) mySchema must be a well-defined StructType, which presently does not allow for a recursive type. On Wed, May 20, 2015 at 5:39 PM Rakesh Chalasani vnit.rak...@gmail.com wrote: Hi Jeremy: Row is a collect of 'Any'. So, you can be used as a recursive data type. Is this what you were looking for? Example: val x = sc.parallelize(Array.range(0,10)).map(x = Row(Row(x), Row(x.toString))) Rakesh On Wed, May 20, 2015 at 7:23 PM Jeremy Lucas jeremyalu...@gmail.com wrote: Spark SQL has proven to be quite useful in applying a partial schema to large JSON logs and being able to write plain SQL to perform a wide variety of operations over this data. However, one small thing that keeps coming back to haunt me is the lack of support for recursive data types, whereby a member of a complex/struct value can be of the same type as the complex/struct value itself. I am hoping someone may be able to point me in the right direction of where to start to build out such capabilities, as I'd be happy to contribute, but am very new to this particular component of the Spark project.
Representing a recursive data type in Spark SQL
Spark SQL has proven to be quite useful in applying a partial schema to large JSON logs and being able to write plain SQL to perform a wide variety of operations over this data. However, one small thing that keeps coming back to haunt me is the lack of support for recursive data types, whereby a member of a complex/struct value can be of the same type as the complex/struct value itself. I am hoping someone may be able to point me in the right direction of where to start to build out such capabilities, as I'd be happy to contribute, but am very new to this particular component of the Spark project.
Low throughput and effect of GC in SparkSql GROUP BY
Hi, Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a data point regarding the performance of Group By, indicating there's excessive GC and it's impacting the throughput. I want to know if the new memory manager for aggregations (https://github.com/apache/spark/pull/5725/) is going to address this kind of issue. I only have a small amount of data on each node (~360MB) with a large heap size (18 Gig). I still see 2-3 minor collections happening whenever I do a Select Sum() with a group by(). I have tried with different sizes for Young Generation without much effect, though not with different GC algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps). I have made a chart of my results [1] by adding timing code to Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab benchmark, running over 10 million records. The chart is from one of the 4 worker nodes in the cluster. I am trying to square this with a claim on the Project Tungsten blog post [2]: When profiling Spark user applications, we’ve found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory. Am I correct in assuming that SparkSql is yet to reach that level of efficiency, at least in aggregation operations? Thanks. [1] - https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174 [2] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html Pramod
Re: Low throughput and effect of GC in SparkSql GROUP BY
Does this turn codegen on? I think the performance is fairly different when codegen is turned on. For 1.5, we are investigating having codegen on by default, so users get much better performance out of the box. On Wed, May 20, 2015 at 5:24 PM, Pramod Biligiri pramodbilig...@gmail.com wrote: Hi, Somewhat similar to Daniel Mescheder's mail yesterday on SparkSql, I have a data point regarding the performance of Group By, indicating there's excessive GC and it's impacting the throughput. I want to know if the new memory manager for aggregations ( https://github.com/apache/spark/pull/5725/) is going to address this kind of issue. I only have a small amount of data on each node (~360MB) with a large heap size (18 Gig). I still see 2-3 minor collections happening whenever I do a Select Sum() with a group by(). I have tried with different sizes for Young Generation without much effect, though not with different GC algorithms (Hm..I ought to try reducing the rdd storage fraction perhaps). I have made a chart of my results [1] by adding timing code to Aggregates.scala. The query is actually Query 2 from Berkeley's AmpLab benchmark, running over 10 million records. The chart is from one of the 4 worker nodes in the cluster. I am trying to square this with a claim on the Project Tungsten blog post [2]: When profiling Spark user applications, we’ve found that a large fraction of the CPU time is spent waiting for data to be fetched from main memory. Am I correct in assuming that SparkSql is yet to reach that level of efficiency, at least in aggregation operations? Thanks. [1] - https://docs.google.com/spreadsheets/d/1HSqYfic3n5s9i4Wsi1Qg0FKN_AWz2vV7_6RRMrtzplQ/edit#gid=481134174 [2] https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html Pramod
Re: Performance Memory Issues When Creating Many Columns in GROUP BY (spark-sql)
It is a lot of columns, but I'm not sure if that's why it is running out of memory. In Spark SQL, we are not yet doing external aggregation when the number of keys is large in the aggregation hashmap. We will fix this and have external aggregation in 1.5. On Tue, May 19, 2015 at 2:43 AM, daniel.mescheder daniel.mesche...@realimpactanalytics.com wrote: Dear List, We have run into serious problems trying to run a larger than average number of aggregations in a GROUP BY query. Symptoms of this problem are OutOfMemory exceptions and unreasonably long processing times due to GC. The problem occurs when the following two conditions are met: - The number of groups is relatively large (growing with the size of the dataset) - The number of columns is relatively large To reproduce, paste the following gist into your spark-shell (I'm running 1.3.1): https://gist.github.com/DanielMe/9467bb0d9ad3aa639429 This example is relatively small in size: - The size of the input is 10ˆ6 * 64bit = 8MB - The size of the output should be around 3 * 10ˆ8 * 64bit = 2.4GB - The aggregations themselves are just count(1) and hence not so difficult to compute I am running this on a cluster with three 61GB worker machines and an equally equipped master with the following spark-defaults.conf: spark.executor.memory=55g spark.driver.memory=55g The result: The workers will choke with java.lang.OutOfMemoryError: GC overhead limit exceeded. In fact, if you play with the num_columns parameter you should observe an unreasonable amount of time spent on GC even for lower values. If you run this on a desktop machine, low values of num_columns should already lead to OOM crashes. My questions are: - What causes this behaviour? - Can/should catalyst be able to automatically optimize queries of this kind to run in reasonable time or at least not crash? - What are possible workarounds to achieve the desired effect? (Even if that means not using DataFrames but going down to the raw RDD level) Our preliminary analysis of the situation concluded that what is blowing up is in fact the hashTable in Aggregate::doExecute which will try to store the cross product of groups and columns on each partition. In fact, we managed to mitigate the issue a bit by - reducing the size of the partitions (which will make these hash tables smaller) - pre-partitioning the data using a HashPartitioner on the key (which will reduce the number of different groups per partition) The latter actually seems to be a sensible thing to do whenever num_columns*num_groups num_rows because in this setting the amount of data we have to shuffle around after the first aggregation step is actually larger than the amount of data we had initially. Could this be something that catalyst should take into account when creating a physical plan? Thanks in advance. Kind regards, Daniel -- View this message in context: Performance Memory Issues When Creating Many Columns in GROUP BY (spark-sql) http://apache-spark-developers-list.1001551.n3.nabble.com/Performance-Memory-Issues-When-Creating-Many-Columns-in-GROUP-BY-spark-sql-tp12313.html Sent from the Apache Spark Developers List mailing list archive http://apache-spark-developers-list.1001551.n3.nabble.com/ at Nabble.com.
Re: [VOTE] Release Apache Spark 1.4.0 (RC1)
-1 discovered I accidentally removed master worker json endpoints, will restore https://issues.apache.org/jira/browse/SPARK-7760 On Tue, May 19, 2015 at 11:10 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc1 (commit 777a081): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=777a08166f1fb144146ba32581d4632c3466541e The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-1.4.0-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1092/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-1.4.0-rc1-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Friday, May 22, at 17:03 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org