Merging code into branch 1.3
Hey Committers, Now that Spark 1.3 rc1 is cut, please restrict branch-1.3 merges to the following: 1. Fixes for issues blocking the 1.3 release (i.e. 1.2.X regressions) 2. Documentation and tests. 3. Fixes for non-blocker issues that are surgical, low-risk, and/or outside of the core. If there is a lower priority bug fix (a non-blocker) that requires nontrivial code changes, do not merge it into 1.3. If something seems borderline, feel free to reach out to me and we can work through it together. This is what we've done for the last few releases to make sure rc's become progressively more stable, and it is important towards helping us cut timely releases. Thanks! - Patrick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[VOTE] Release Apache Spark 1.3.0 (RC1)
Please vote on releasing the following candidate as Apache Spark version 1.3.0! The tag to be voted on is v1.3.0-rc1 (commit f97b0d4a): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f97b0d4a6b26504916816d7aefcf3132cd1da6c2 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-1.3.0-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1069/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-1.3.0-rc1-docs/ Please vote on releasing this package as Apache Spark 1.3.0! The vote is open until Saturday, February 21, at 08:03 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.3.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.2 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.3 QA period, so -1 votes should only occur for significant regressions from 1.2.1. Bugs already present in 1.2.X, minor regressions, or bugs related to new features will not block this release. - Patrick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Replacing Jetty with TomCat
I do not think it makes sense to make the web server configurable. Mostly because there's no real problem in running an HTTP service internally based on Netty while you run your own HTTP service based on something else like Tomcat. What's the problem? On Wed, Feb 18, 2015 at 3:14 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi Sean, The main issue we have is, running two web servers in a single product. we think it would not be an elegant solution. Could you please point me to the main areas where jetty server is tightly coupled or extension points where I could plug tomcat instead of jetty? If successful I could contribute it to the spark project. :-) cheers On Mon, Feb 16, 2015 at 4:51 PM, Sean Owen so...@cloudera.com wrote: There's no particular reason you have to remove the embedded Jetty server, right? it doesn't prevent you from using it inside another app that happens to run in Tomcat. You won't be able to switch it out without rewriting a fair bit of code, no, but you don't need to. On Mon, Feb 16, 2015 at 5:08 AM, Niranda Perera niranda.per...@gmail.com wrote: Hi, We are thinking of integrating Spark server inside a product. Our current product uses Tomcat as its webserver. Is it possible to switch the Jetty webserver in Spark to Tomcat off-the-shelf? Cheers -- Niranda -- Niranda - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
The serializer is created with val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) Which is definitely not the closure serializer and so should respect what you are setting with spark.serializer. Maybe you can do a quick bit of debugging to see where that assumption breaks down? like are you sure spark.serializer is set everywhere? On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [VOTE] Release Apache Spark 1.3.0 (RC1)
UISeleniumSuite: *** RUN ABORTED *** java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversal ... This is a newer test suite. There is something flaky about it, we should definitely fix it, IMO it's not a blocker though. Patrick this link gives a 404: https://people.apache.org/keys/committer/pwendell.asc Works for me. Maybe it's some ephemeral issue? Finally, I already realized I failed to get the fix for https://issues.apache.org/jira/browse/SPARK-5669 correct, and that has to be correct for the release. I'll patch that up straight away, sorry. I believe the result of the intended fix is still as I described in SPARK-5669, so there is no bad news there. A local test seems to confirm it and I'm waiting on Jenkins. If it's all good I'll merge that fix. So, that much will need a new release, I apologize. Thanks for finding this. I'm going to leave this open for continued testing... - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: quick jenkins restart tomorrow morning, ~7am PST
i'm actually going to do this now -- it's really quiet today. there are two spark pull request builds running, which i will kill and retrigger once jenkins is back up: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27689/ https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27690/ On Wed, Feb 18, 2015 at 12:55 PM, shane knapp skn...@berkeley.edu wrote: i'll be kicking jenkins to up the open file limits on the workers. it should be a very short downtime, and i'll post updates on my progress tomorrow. shane
quick jenkins restart tomorrow morning, ~7am PST
i'll be kicking jenkins to up the open file limits on the workers. it should be a very short downtime, and i'll post updates on my progress tomorrow. shane
Re: Streaming partitions to driver for use in .toLocalIterator
This would be pretty tricky to do -- the issue is that right now sparkContext.runJob has you pass in a function from a partition to *one* result object that gets serialized and sent back: Iterator[T] = U, and that idea is baked pretty deep into a lot of the internals, DAGScheduler, Task, Executors, etc. Maybe another possibility worth considering: should we make it easy to go from N partitions to 2N partitions (or any other multiple obviously) without requiring a shuffle? for that matter, you should also be able to go from 2N to N without a shuffle as well. That change is also somewhat involved, though. Both are in theory possible, but I imagine they'd need really compelling use cases. An alternative would be to write your RDD to some other data store (eg, hdfs) which has better support for reading data in a streaming fashion, though you would probably be unhappy with the overhead. On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash and...@andrewash.com wrote: Hi Spark devs, I'm creating a streaming export functionality for RDDs and am having some trouble with large partitions. The RDD.toLocalIterator() call pulls over a partition at a time to the driver, and then streams the RDD out from that partition before pulling in the next one. When you have large partitions though, you can OOM the driver, especially when multiple of these exports are happening in the same SparkContext. One idea I had was to repartition the RDD so partitions are smaller, but it's hard to know a priori what the partition count should be, and I'd like to avoid paying the shuffle cost if possible -- I think repartition to a higher partition count forces a shuffle. Is it feasible to rework this so the executor - driver transfer in .toLocalIterator is a steady stream rather than a partition at a time? Thanks! Andrew
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira _browse_SPARK-2D4743d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8 r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3 WSrjQQZX1rW_e9ws=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekce= / https://github.com/apache/spark/pull/3605 https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar k_pull_3605d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=hzwIMNQ 9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1 rW_e9ws=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8e= . Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD¹s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey requires that a ³createCombiner² function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn¹t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
If job fails shuffle space is not cleaned
Hi, Some of my jobs failed due to no space left on device and on those jobs I was monitoring the shuffle space...when the job failed shuffle space did not clean and I had to manually clean it... Is there a JIRA already tracking this issue ? If no one has been assigned to it, I can take a look. Thanks. Deb
Re: [VOTE] Release Apache Spark 1.3.0 (RC1)
On OS X and Ubuntu I see the following test failure in the source release for 1.3.0-RC1: UISeleniumSuite: *** RUN ABORTED *** java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversal ... Patrick this link gives a 404: https://people.apache.org/keys/committer/pwendell.asc Finally, I already realized I failed to get the fix for https://issues.apache.org/jira/browse/SPARK-5669 correct, and that has to be correct for the release. I'll patch that up straight away, sorry. I believe the result of the intended fix is still as I described in SPARK-5669, so there is no bad news there. A local test seems to confirm it and I'm waiting on Jenkins. If it's all good I'll merge that fix. So, that much will need a new release, I apologize. Please keep testing anyway! Otherwise, I verified the signatures are correct, licenses are correct, compiles on OS X and Ubuntu 14. On Wed, Feb 18, 2015 at 8:12 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.3.0! The tag to be voted on is v1.3.0-rc1 (commit f97b0d4a): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f97b0d4a6b26504916816d7aefcf3132cd1da6c2 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-1.3.0-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1069/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-1.3.0-rc1-docs/ Please vote on releasing this package as Apache Spark 1.3.0! The vote is open until Saturday, February 21, at 08:03 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.3.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.2 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.3 QA period, so -1 votes should only occur for significant regressions from 1.2.1. Bugs already present in 1.2.X, minor regressions, or bugs related to new features will not block this release. - Patrick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: 1. What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. 2. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah
Issue SPARK-5008 (persistent-hdfs broken)
I've recently run into problems caused by ticket SPARK-5008 https://issues.apache.org/jira/browse/SPARK-5008 This seems like quite a serious regression in 1.2.0, meaning that it's not really possible to use persistent-hdfs. The config for the persistent-hdfs points to the wrong part of the filesystem, so it comes up on the wrong volume (and therefore has the wrong capacity). I'm working around it with symlinks, but it's not ideal. It doesn't look like it's scheduled to be fixed in any particular release. Is there any indication of whether this is on anyone's todo list? If no-one's looking into it then I could try having a look myself, but I'm not (yet) familiar with the internals. From the discussion on the ticket it doesn't look like a huge fix. Cheers Joe
Streaming partitions to driver for use in .toLocalIterator
Hi Spark devs, I'm creating a streaming export functionality for RDDs and am having some trouble with large partitions. The RDD.toLocalIterator() call pulls over a partition at a time to the driver, and then streams the RDD out from that partition before pulling in the next one. When you have large partitions though, you can OOM the driver, especially when multiple of these exports are happening in the same SparkContext. One idea I had was to repartition the RDD so partitions are smaller, but it's hard to know a priori what the partition count should be, and I'd like to avoid paying the shuffle cost if possible -- I think repartition to a higher partition count forces a shuffle. Is it feasible to rework this so the executor - driver transfer in .toLocalIterator is a steady stream rather than a partition at a time? Thanks! Andrew
Re: [ml] Lost persistence for fold in crossvalidation.
Now in JIRA form: https://issues.apache.org/jira/browse/SPARK-5844 On Tue, Feb 17, 2015 at 3:12 PM, Xiangrui Meng men...@gmail.com wrote: There are three different regParams defined in the grid and there are tree folds. For simplicity, we didn't split the dataset into three and reuse them, but do the split for each fold. Then we need to cache 3*3 times. Note that the pipeline API is not yet optimized for performance. It would be nice to optimize its perforamnce in 1.4. -Xiangrui On Wed, Feb 11, 2015 at 11:13 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi i have a problem. Using spark 1.2 with Pipeline + GridSearch + LogisticRegression. I’ve reimplemented LogisticRegression.fit method and comment out instances.unpersist() |override def fit(dataset:SchemaRDD, paramMap:ParamMap):LogisticRegressionModel = { println(sFitting dataset ${dataset.take(1000).toSeq.hashCode()} with ParamMap $paramMap.) transformSchema(dataset.schema, paramMap, logging =true) import dataset.sqlContext._ val map = this.paramMap ++ paramMap val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) .map { case Row(label:Double, features:Vector) = LabeledPoint(label, features) } if (instances.getStorageLevel ==StorageLevel.NONE) { println(Instances not persisted) instances.persist(StorageLevel.MEMORY_AND_DISK) } val lr = (new LogisticRegressionWithLBFGS) .setValidateData(false) .setIntercept(true) lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) val lrm = new LogisticRegressionModel(this, map, lr.run(instances).weights) //instances.unpersist() // copy model params Params.inheritValues(map,this, lrm) lrm } | CrossValidator feeds the same SchemaRDD for each parameter (same hash code), but somewhere cache being flushed. The memory is enough. Here’s the output: |Fitting dataset 2051470010 with ParamMap { DRLogisticRegression-f35ae4d3-regParam: 0.1 }. Instances not persisted Fitting dataset 2051470010 with ParamMap { DRLogisticRegression-f35ae4d3-regParam: 0.01 }. Instances not persisted Fitting dataset 2051470010 with ParamMap { DRLogisticRegression-f35ae4d3-regParam: 0.001 }. Instances not persisted Fitting dataset 802615223 with ParamMap { DRLogisticRegression-f35ae4d3-regParam: 0.1 }. Instances not persisted Fitting dataset 802615223 with ParamMap { DRLogisticRegression-f35ae4d3-regParam: 0.01 }. Instances not persisted | I have 3 parameters in GridSearch and 3 folds for CrossValidation: | val paramGrid = new ParamGridBuilder() .addGrid(model.regParam,Array(0.1,0.01,0.001)) .build() crossval.setEstimatorParamMaps(paramGrid) crossval.setNumFolds(3) | I assume that the data should be read and cached 3 times (1 to numFolds).combinations(2) and be independent from number of parameters. But i have 9 times data being read and cached. Thanks, Peter Rudenko - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
That looks, at the least, inconsistent. As far as I know this should be changed so that the zero value is always cloned via the non-closure serializer. Any objection to that? On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote: But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?
Yes, that's a bug and should be using the standard serializer. On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen so...@cloudera.com wrote: That looks, at the least, inconsistent. As far as I know this should be changed so that the zero value is always cloned via the non-closure serializer. Any objection to that? On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote: But RDD.aggregate() has this code: // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps we just missed it and need to apply the change to aggregate()? It seems appropriate to target a fix for 1.3.0. -Matt Cheah From: Josh Rosen rosenvi...@gmail.com Date: Wednesday, February 18, 2015 at 6:12 AM To: Matt Cheah mch...@palantir.com Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Andrew Ash a...@palantir.com Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning? It looks like this was fixed in https://issues.apache.org/jira/browse/SPARK-4743 / https://github.com/apache/spark/pull/3605. Can you see whether that patch fixes this issue for you? On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, I was using JavaPairRDD’s combineByKey() to compute all of my aggregations before, since I assumed that every aggregation required a key. However, I realized I could do my analysis using JavaRDD’s aggregate() instead and not use a key. I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey requires that a “createCombiner” function is provided, and the return value from that function must be serializable using Kryo. When I switched to using rdd.aggregate I assumed that the zero value would also be strictly Kryo serialized, as it is a data item and not part of a closure or the aggregation functions. However, I got a serialization exception as the closure serializer (only valid serializer is the Java serializer) was used instead. I was wondering the following: What is the rationale for making the zero value be serialized using the closure serializer? This isn’t part of the closure, but is an initial data item. Would it make sense for us to perhaps write a version of rdd.aggregate() that takes a function as a parameter, that generates the zero value? This would be more intuitive to be serialized using the closure serializer. I believe aggregateByKey is also affected. Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Batch prediciton for ALS
Please create a JIRA for it and we should discuss the APIs first before updating the code. -Xiangrui On Tue, Feb 17, 2015 at 4:10 PM, Debasish Das debasish.da...@gmail.com wrote: It will be really help us if we merge it but I guess it is already diverged from the new ALS...I will also take a look at it again and try update with the new ALS... On Tue, Feb 17, 2015 at 3:22 PM, Xiangrui Meng men...@gmail.com wrote: It may be too late to merge it into 1.3. I'm going to make another pass on your PR today. -Xiangrui On Tue, Feb 10, 2015 at 8:01 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, Will it be possible to merge this PR to 1.3 ? https://github.com/apache/spark/pull/3098 The batch prediction API in ALS will be useful for us who want to cross validate on prec@k and MAP... Thanks. Deb - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[Performance] Possible regression in rdd.take()?
Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [VOTE] Release Apache Spark 1.3.0 (RC1)
On Wed, Feb 18, 2015 at 6:13 PM, Patrick Wendell pwend...@gmail.com wrote: Patrick this link gives a 404: https://people.apache.org/keys/committer/pwendell.asc Works for me. Maybe it's some ephemeral issue? Yes works now; I swear it didn't before! that's all set now. The signing key is in that file. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [Performance] Possible regression in rdd.take()?
I believe the heuristic governing the way that take() decides to fetch partitions changed between these versions. It could be that in certain cases the new heuristic is worse, but it might be good to just look at the source code and see, for your number of elements taken and number of partitions, if there was any effective change in how aggressively spark fetched partitions. This was quite a while ago, but I think the change was made because in many cases the newer code works more efficiently. - Patrick On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [Performance] Possible regression in rdd.take()?
I actually tested Spark 1.2.0 with the code in the rdd.take() method swapped out for what was in Spark 1.0.2. The run time was still slower, which indicates to me something at work lower in the stack. -Matt Cheah On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote: I believe the heuristic governing the way that take() decides to fetch partitions changed between these versions. It could be that in certain cases the new heuristic is worse, but it might be good to just look at the source code and see, for your number of elements taken and number of partitions, if there was any effective change in how aggressively spark fetched partitions. This was quite a while ago, but I think the change was made because in many cases the newer code works more efficiently. - Patrick On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [Performance] Possible regression in rdd.take()?
Ah okay, I turned on spark.localExecution.enabled and the performance returned to what Spark 1.0.2 had. However I can see how users can inadvertently incur memory and network strain in fetching the whole partition to the driver. I¹ll evaluate on my side if we want to turn this on or not. Thanks for the quick and accurate response! -Matt CHeah From: Aaron Davidson ilike...@gmail.com Date: Wednesday, February 18, 2015 at 5:25 PM To: Matt Cheah mch...@palantir.com Cc: Patrick Wendell pwend...@gmail.com, dev@spark.apache.org dev@spark.apache.org, Mingyu Kim m...@palantir.com, Sandor Van Wassenhove sand...@palantir.com Subject: Re: [Performance] Possible regression in rdd.take()? You might be seeing the result of this patch: https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5 797 which was introduced in 1.1.1. This patch disabled the ability for take() to run without launching a Spark job, which means that the latency is significantly increased for small jobs (but not for large ones). You can try enabling local execution and seeing if your problem goes away. On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote: I actually tested Spark 1.2.0 with the code in the rdd.take() method swapped out for what was in Spark 1.0.2. The run time was still slower, which indicates to me something at work lower in the stack. -Matt Cheah On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote: I believe the heuristic governing the way that take() decides to fetch partitions changed between these versions. It could be that in certain cases the new heuristic is worse, but it might be good to just look at the source code and see, for your number of elements taken and number of partitions, if there was any effective change in how aggressively spark fetched partitions. This was quite a while ago, but I think the change was made because in many cases the newer code works more efficiently. - Patrick On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: [VOTE] Release Apache Spark 1.3.0 (RC1)
+1 (non-binding, of course) 1. Compiled OSX 10.10 (Yosemite) OK Total time: 14:50 min mvn clean package -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -DskipTests -Dscala-2.11 2. Tested pyspark, mlib - running as well as compare results with 1.1.x 1.2.x 2.1. statistics (min,max,mean,Pearson,Spearman) OK 2.2. Linear/Ridge/Laso Regression OK But MSE has increased from 40.81 to 105.86. Has some refactoring happened on SGD/Linear Models ? Or do we have some extra parameters ? or change of defaults ? 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK Center And Scale OK WSSSE has come down slightly 2.5. rdd operations OK State of the Union Texts - MapReduce, Filter,sortByKey (word count) 2.6. Recommendation (Movielens medium dataset ~1 M ratings) OK Model evaluation/optimization (rank, numIter, lmbda) with itertools OK 3. Scala - MLlib 3.1. statistics (min,max,mean,Pearson,Spearman) OK 3.2. LinearRegressionWIthSGD OK 3.3. Decision Tree OK 3.4. KMeans OK 3.5. Recommendation (Movielens medium dataset ~1 M ratings) OK Cheers k/ P.S: For some reason replacing import sqlContext.createSchemaRDD with import sqlContext.implicits._ doesn't do the implicit conversations. registerTempTable gives syntax error. I will dig deeper tomorrow. Has anyone seen this ? On Wed, Feb 18, 2015 at 3:25 PM, Sean Owen so...@cloudera.com wrote: On Wed, Feb 18, 2015 at 6:13 PM, Patrick Wendell pwend...@gmail.com wrote: Patrick this link gives a 404: https://people.apache.org/keys/committer/pwendell.asc Works for me. Maybe it's some ephemeral issue? Yes works now; I swear it didn't before! that's all set now. The signing key is in that file. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [Performance] Possible regression in rdd.take()?
You might be seeing the result of this patch: https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797 which was introduced in 1.1.1. This patch disabled the ability for take() to run without launching a Spark job, which means that the latency is significantly increased for small jobs (but not for large ones). You can try enabling local execution and seeing if your problem goes away. On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote: I actually tested Spark 1.2.0 with the code in the rdd.take() method swapped out for what was in Spark 1.0.2. The run time was still slower, which indicates to me something at work lower in the stack. -Matt Cheah On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote: I believe the heuristic governing the way that take() decides to fetch partitions changed between these versions. It could be that in certain cases the new heuristic is worse, but it might be good to just look at the source code and see, for your number of elements taken and number of partitions, if there was any effective change in how aggressively spark fetched partitions. This was quite a while ago, but I think the change was made because in many cases the newer code works more efficiently. - Patrick On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote: Hi everyone, Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take() consistently has a slower execution time on the later release. I was wondering if anyone else has had similar observations. I have two setups where this reproduces. The first is a local test. I launched a spark cluster with 4 worker JVMs on my Mac, and launched a Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8 files, which ends up having 128 partitions, and a total of 8000 rows. The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all numbers being in seconds: 1 items Spark 1.0.2: 0.069281, 0.012261, 0.011083 Spark 1.1.1: 0.11577, 0.097636, 0.11321 4 items Spark 1.0.2: 0.023751, 0.069365, 0.023603 Spark 1.1.1: 0.224287, 0.229651, 0.158431 10 items Spark 1.0.2: 0.047019, 0.049056, 0.042568 Spark 1.1.1: 0.353277, 0.288965, 0.281751 40 items Spark 1.0.2: 0.216048, 0.198049, 0.796037 Spark 1.1.1: 1.865622, 2.224424, 2.037672 This small test suite indicates a consistently reproducible performance regression. I also notice this on a larger scale test. The cluster used is on EC2: ec2 instance type: m2.4xlarge 10 slaves, 1 master ephemeral storage 70 cores, 50 GB/box In this case, I have a 100GB dataset split into 78 files totally 350 million items, and I take the first 50,000 items from the RDD. In this case, I have tested this on different formats of the raw data. With plaintext files: Spark 1.0.2: 0.422s, 0.363s, 0.382s Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s With snappy-compressed Avro files: Spark 1.0.2: 0.73s, 0.395s, 0.426s Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s Again demonstrating a reproducible performance regression. I was wondering if anyone else observed this regression, and if so, if anyone would have any idea what could possibly have caused it between Spark 1.0.2 and Spark 1.1.1? Thanks, -Matt Cheah
Re: Streaming partitions to driver for use in .toLocalIterator
Another alternative would be to compress the partition in memory in a streaming fashion instead of calling .toArray on the iterator. Would it be an easier mitigation to the problem? Or, is it hard to compress the rows one by one without materializing the full partition in memory using the compression algo Spark uses currently? Mingyu On 2/18/15, 1:01 PM, Imran Rashid iras...@cloudera.com wrote: This would be pretty tricky to do -- the issue is that right now sparkContext.runJob has you pass in a function from a partition to *one* result object that gets serialized and sent back: Iterator[T] = U, and that idea is baked pretty deep into a lot of the internals, DAGScheduler, Task, Executors, etc. Maybe another possibility worth considering: should we make it easy to go from N partitions to 2N partitions (or any other multiple obviously) without requiring a shuffle? for that matter, you should also be able to go from 2N to N without a shuffle as well. That change is also somewhat involved, though. Both are in theory possible, but I imagine they'd need really compelling use cases. An alternative would be to write your RDD to some other data store (eg, hdfs) which has better support for reading data in a streaming fashion, though you would probably be unhappy with the overhead. On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash and...@andrewash.com wrote: Hi Spark devs, I'm creating a streaming export functionality for RDDs and am having some trouble with large partitions. The RDD.toLocalIterator() call pulls over a partition at a time to the driver, and then streams the RDD out from that partition before pulling in the next one. When you have large partitions though, you can OOM the driver, especially when multiple of these exports are happening in the same SparkContext. One idea I had was to repartition the RDD so partitions are smaller, but it's hard to know a priori what the partition count should be, and I'd like to avoid paying the shuffle cost if possible -- I think repartition to a higher partition count forces a shuffle. Is it feasible to rework this so the executor - driver transfer in .toLocalIterator is a steady stream rather than a partition at a time? Thanks! Andrew - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Spark-SQL 1.2.0 sort by results are not consistent with Hive
According to hive documentation, sort by is supposed to order the results for each reducer. So if we set a single reducer, then the results should be sorted, right? But this is not happening. Any idea why? Looks like the settings I am using to restrict the number of reducers is not having an effect. *Tried the following:* Set spark.default.parallelism to 1 Set spark.sql.shuffle.partitions to 1 These were set in hive-site.xml and also inside spark shell. *Spark-SQL* create table if not exists testSortBy (key int, name string, age int); LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE testSortBy; select * from testSortBY; 1Aditya28 2aash25 3prashanth27 4bharath26 5terry27 6nanda26 7pradeep27 8pratyay26 set spark.default.parallelism=1; set spark.sql.shuffle.partitions=1; select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth 27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age from testSortBy sort by age; aash25 bharath26 nanda26 pratyay26 prashanth27 terry27 pradeep27 Aditya28 -- Kannan