Merging code into branch 1.3

2015-02-18 Thread Patrick Wendell
Hey Committers,

Now that Spark 1.3 rc1 is cut, please restrict branch-1.3 merges to
the following:

1. Fixes for issues blocking the 1.3 release (i.e. 1.2.X regressions)
2. Documentation and tests.
3. Fixes for non-blocker issues that are surgical, low-risk, and/or
outside of the core.

If there is a lower priority bug fix (a non-blocker) that requires
nontrivial code changes, do not merge it into 1.3. If something seems
borderline, feel free to reach out to me and we can work through it
together.

This is what we've done for the last few releases to make sure rc's
become progressively more stable, and it is important towards helping
us cut timely releases.

Thanks!

- Patrick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Patrick Wendell
Please vote on releasing the following candidate as Apache Spark version 1.3.0!

The tag to be voted on is v1.3.0-rc1 (commit f97b0d4a):
https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f97b0d4a6b26504916816d7aefcf3132cd1da6c2

The release files, including signatures, digests, etc. can be found at:
http://people.apache.org/~pwendell/spark-1.3.0-rc1/

Release artifacts are signed with the following key:
https://people.apache.org/keys/committer/pwendell.asc

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapachespark-1069/

The documentation corresponding to this release can be found at:
http://people.apache.org/~pwendell/spark-1.3.0-rc1-docs/

Please vote on releasing this package as Apache Spark 1.3.0!

The vote is open until Saturday, February 21, at 08:03 UTC and passes
if a majority of at least 3 +1 PMC votes are cast.

[ ] +1 Release this package as Apache Spark 1.3.0
[ ] -1 Do not release this package because ...

To learn more about Apache Spark, please see
http://spark.apache.org/

== How can I help test this release? ==
If you are a Spark user, you can help us test this release by
taking a Spark 1.2 workload and running on this release candidate,
then reporting any regressions.

== What justifies a -1 vote for this release? ==
This vote is happening towards the end of the 1.3 QA period,
so -1 votes should only occur for significant regressions from 1.2.1.
Bugs already present in 1.2.X, minor regressions, or bugs related
to new features will not block this release.

- Patrick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Replacing Jetty with TomCat

2015-02-18 Thread Sean Owen
I do not think it makes sense to make the web server configurable.
Mostly because there's no real problem in running an HTTP service
internally based on Netty while you run your own HTTP service based on
something else like Tomcat. What's the problem?

On Wed, Feb 18, 2015 at 3:14 AM, Niranda Perera
niranda.per...@gmail.com wrote:
 Hi Sean,
 The main issue we have is, running two web servers in a single product. we
 think it would not be an elegant solution.

 Could you please point me to the main areas where jetty server is tightly
 coupled or extension points where I could plug tomcat instead of jetty?
 If successful I could contribute it to the spark project. :-)

 cheers



 On Mon, Feb 16, 2015 at 4:51 PM, Sean Owen so...@cloudera.com wrote:

 There's no particular reason you have to remove the embedded Jetty
 server, right? it doesn't prevent you from using it inside another app
 that happens to run in Tomcat. You won't be able to switch it out
 without rewriting a fair bit of code, no, but you don't need to.

 On Mon, Feb 16, 2015 at 5:08 AM, Niranda Perera
 niranda.per...@gmail.com wrote:
  Hi,
 
  We are thinking of integrating Spark server inside a product. Our
  current
  product uses Tomcat as its webserver.
 
  Is it possible to switch the Jetty webserver in Spark to Tomcat
  off-the-shelf?
 
  Cheers
 
  --
  Niranda




 --
 Niranda

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
The serializer is created with

val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)

Which is definitely not the closure serializer and so should respect
what you are setting with spark.serializer.

Maybe you can do a quick bit of debugging to see where that assumption
breaks down? like are you sure spark.serializer is set everywhere?

On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,

 I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD’s aggregate() instead and not
 use a key.

 I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey
 requires that a “createCombiner” function is provided, and the return value
 from that function must be serializable using Kryo. When I switched to using
 rdd.aggregate I assumed that the zero value would also be strictly Kryo
 serialized, as it is a data item and not part of a closure or the
 aggregation functions. However, I got a serialization exception as the
 closure serializer (only valid serializer is the Java serializer) was used
 instead.

 I was wondering the following:

 What is the rationale for making the zero value be serialized using the
 closure serializer? This isn’t part of the closure, but is an initial data
 item.
 Would it make sense for us to perhaps write a version of rdd.aggregate()
 that takes a function as a parameter, that generates the zero value? This
 would be more intuitive to be serialized using the closure serializer.

 I believe aggregateByKey is also affected.

 Thanks,

 -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Patrick Wendell
 UISeleniumSuite:
 *** RUN ABORTED ***
   java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversal
 ...

This is a newer test suite. There is something flaky about it, we
should definitely fix it, IMO it's not a blocker though.


 Patrick this link gives a 404:
 https://people.apache.org/keys/committer/pwendell.asc

Works for me. Maybe it's some ephemeral issue?

 Finally, I already realized I failed to get the fix for
 https://issues.apache.org/jira/browse/SPARK-5669 correct, and that has
 to be correct for the release. I'll patch that up straight away,
 sorry. I believe the result of the intended fix is still as I
 described in SPARK-5669, so there is no bad news there. A local test
 seems to confirm it and I'm waiting on Jenkins. If it's all good I'll
 merge that fix. So, that much will need a new release, I apologize.

Thanks for finding this. I'm going to leave this open for continued testing...

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: quick jenkins restart tomorrow morning, ~7am PST

2015-02-18 Thread shane knapp
i'm actually going to do this now -- it's really quiet today.

there are two spark pull request builds running, which i will kill and
retrigger once jenkins is back up:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27689/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27690/

On Wed, Feb 18, 2015 at 12:55 PM, shane knapp skn...@berkeley.edu wrote:

 i'll be kicking jenkins to up the open file limits on the workers.  it
 should be a very short downtime, and i'll post updates on my progress
 tomorrow.

 shane



quick jenkins restart tomorrow morning, ~7am PST

2015-02-18 Thread shane knapp
i'll be kicking jenkins to up the open file limits on the workers.  it
should be a very short downtime, and i'll post updates on my progress
tomorrow.

shane


Re: Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Imran Rashid
This would be pretty tricky to do -- the issue is that right now
sparkContext.runJob has you pass in a function from a partition to *one*
result object that gets serialized and sent back: Iterator[T] = U, and
that idea is baked pretty deep into a lot of the internals, DAGScheduler,
Task, Executors, etc.

Maybe another possibility worth considering: should we make it easy to go
from N partitions to 2N partitions (or any other multiple obviously)
without requiring a shuffle?  for that matter, you should also be able to
go from 2N to N without a shuffle as well.  That change is also somewhat
involved, though.

Both are in theory possible, but I imagine they'd need really compelling
use cases.

An alternative would be to write your RDD to some other data store (eg,
hdfs) which has better support for reading data in a streaming fashion,
though you would probably be unhappy with the overhead.



On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash and...@andrewash.com wrote:

 Hi Spark devs,

 I'm creating a streaming export functionality for RDDs and am having some
 trouble with large partitions.  The RDD.toLocalIterator() call pulls over a
 partition at a time to the driver, and then streams the RDD out from that
 partition before pulling in the next one.  When you have large partitions
 though, you can OOM the driver, especially when multiple of these exports
 are happening in the same SparkContext.

 One idea I had was to repartition the RDD so partitions are smaller, but
 it's hard to know a priori what the partition count should be, and I'd like
 to avoid paying the shuffle cost if possible -- I think repartition to a
 higher partition count forces a shuffle.

 Is it feasible to rework this so the executor - driver transfer in
 .toLocalIterator is a steady stream rather than a partition at a time?

 Thanks!
 Andrew



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Matt Cheah
But RDD.aggregate() has this code:

// Clone the zero value since we will also be serializing it as part of
tasks
var jobResult = Utils.clone(zeroValue,
sc.env.closureSerializer.newInstance())

I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
we just missed it and need to apply the change to aggregate()? It seems
appropriate to target a fix for 1.3.0.

-Matt Cheah
From:  Josh Rosen rosenvi...@gmail.com
Date:  Wednesday, February 18, 2015 at 6:12 AM
To:  Matt Cheah mch...@palantir.com
Cc:  dev@spark.apache.org dev@spark.apache.org, Mingyu Kim
m...@palantir.com, Andrew Ash a...@palantir.com
Subject:  Re: JavaRDD Aggregate initial value - Closure-serialized zero
value reasoning?

It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
_browse_SPARK-2D4743d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8
r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3
WSrjQQZX1rW_e9ws=lOqRteYjf7RRl41OfKvkfh7IaSs3wIW643Fz_Iwlekce=  /
https://github.com/apache/spark/pull/3605
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spar
k_pull_3605d=AwMFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=hzwIMNQ
9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAsm=HsNLIeID8mKWH68HoNyb_x4jS5D3WSrjQQZX1
rW_e9ws=60tyF-5TbJyVlh7upvFFhNbxKFhh9bUCWJMp5D2wUN8e= .  Can you see
whether that patch fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,
 
 I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD¹s aggregate() instead and not
 use a key.
 
 I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
 requires that a ³createCombiner² function is provided, and the return value
 from that function must be serializable using Kryo. When I switched to using
 rdd.aggregate I assumed that the zero value would also be strictly Kryo
 serialized, as it is a data item and not part of a closure or the aggregation
 functions. However, I got a serialization exception as the closure serializer
 (only valid serializer is the Java serializer) was used instead.
 
 I was wondering the following:
 1. What is the rationale for making the zero value be serialized using the
 closure serializer? This isn¹t part of the closure, but is an initial data
 item.
 2. Would it make sense for us to perhaps write a version of rdd.aggregate()
 that takes a function as a parameter, that generates the zero value? This
 would be more intuitive to be serialized using the closure serializer.
 I believe aggregateByKey is also affected.
 
 Thanks,
 
 -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


If job fails shuffle space is not cleaned

2015-02-18 Thread Debasish Das
Hi,

Some of my jobs failed due to no space left on device and on those jobs I
was monitoring the shuffle space...when the job failed shuffle space did
not clean and I had to manually clean it...

Is there a JIRA already tracking this issue ? If no one has been assigned
to it, I can take a look.

Thanks.
Deb


Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Sean Owen
On OS X and Ubuntu I see the following test failure in the source
release for 1.3.0-RC1:

UISeleniumSuite:
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversal
...


Patrick this link gives a 404:
https://people.apache.org/keys/committer/pwendell.asc


Finally, I already realized I failed to get the fix for
https://issues.apache.org/jira/browse/SPARK-5669 correct, and that has
to be correct for the release. I'll patch that up straight away,
sorry. I believe the result of the intended fix is still as I
described in SPARK-5669, so there is no bad news there. A local test
seems to confirm it and I'm waiting on Jenkins. If it's all good I'll
merge that fix. So, that much will need a new release, I apologize.


Please keep testing anyway!


Otherwise, I verified the signatures are correct, licenses are
correct, compiles on OS X and Ubuntu 14.


On Wed, Feb 18, 2015 at 8:12 AM, Patrick Wendell pwend...@gmail.com wrote:
 Please vote on releasing the following candidate as Apache Spark version 
 1.3.0!

 The tag to be voted on is v1.3.0-rc1 (commit f97b0d4a):
 https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f97b0d4a6b26504916816d7aefcf3132cd1da6c2

 The release files, including signatures, digests, etc. can be found at:
 http://people.apache.org/~pwendell/spark-1.3.0-rc1/

 Release artifacts are signed with the following key:
 https://people.apache.org/keys/committer/pwendell.asc

 The staging repository for this release can be found at:
 https://repository.apache.org/content/repositories/orgapachespark-1069/

 The documentation corresponding to this release can be found at:
 http://people.apache.org/~pwendell/spark-1.3.0-rc1-docs/

 Please vote on releasing this package as Apache Spark 1.3.0!

 The vote is open until Saturday, February 21, at 08:03 UTC and passes
 if a majority of at least 3 +1 PMC votes are cast.

 [ ] +1 Release this package as Apache Spark 1.3.0
 [ ] -1 Do not release this package because ...

 To learn more about Apache Spark, please see
 http://spark.apache.org/

 == How can I help test this release? ==
 If you are a Spark user, you can help us test this release by
 taking a Spark 1.2 workload and running on this release candidate,
 then reporting any regressions.

 == What justifies a -1 vote for this release? ==
 This vote is happening towards the end of the 1.3 QA period,
 so -1 votes should only occur for significant regressions from 1.2.1.
 Bugs already present in 1.2.X, minor regressions, or bugs related
 to new features will not block this release.

 - Patrick

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Josh Rosen
It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743 /
https://github.com/apache/spark/pull/3605.  Can you see whether that patch
fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:

 Hi everyone,

 I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD’s aggregate() instead and not
 use a key.

 I have set spark.serializer to use Kryo. As a result, JavaRDD’s
 combineByKey requires that a “createCombiner” function is provided, and the
 return value from that function must be serializable using Kryo. When I
 switched to using rdd.aggregate I assumed that the zero value would also be
 strictly Kryo serialized, as it is a data item and not part of a closure or
 the aggregation functions. However, I got a serialization exception as the
 closure serializer (only valid serializer is the Java serializer) was used
 instead.

 I was wondering the following:

1. What is the rationale for making the zero value be serialized using
the closure serializer? This isn’t part of the closure, but is an initial
data item.
2. Would it make sense for us to perhaps write a version of
rdd.aggregate() that takes a function as a parameter, that generates the
zero value? This would be more intuitive to be serialized using the closure
serializer.

 I believe aggregateByKey is also affected.

 Thanks,

 -Matt Cheah



Issue SPARK-5008 (persistent-hdfs broken)

2015-02-18 Thread Joe Wass
I've recently run into problems caused by ticket SPARK-5008

https://issues.apache.org/jira/browse/SPARK-5008

This seems like quite a serious regression in 1.2.0, meaning that it's not
really possible to use persistent-hdfs. The config for the persistent-hdfs
points to the wrong part of the filesystem, so it comes up on the wrong
volume (and therefore has the wrong capacity). I'm working around it with
symlinks, but it's not ideal.

It doesn't look like it's scheduled to be fixed in any particular release.
Is there any indication of whether this is on anyone's todo list?

If no-one's looking into it then I could try having a look myself, but I'm
not (yet) familiar with the internals. From the discussion on the ticket it
doesn't look like a huge fix.

Cheers

Joe


Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Andrew Ash
Hi Spark devs,

I'm creating a streaming export functionality for RDDs and am having some
trouble with large partitions.  The RDD.toLocalIterator() call pulls over a
partition at a time to the driver, and then streams the RDD out from that
partition before pulling in the next one.  When you have large partitions
though, you can OOM the driver, especially when multiple of these exports
are happening in the same SparkContext.

One idea I had was to repartition the RDD so partitions are smaller, but
it's hard to know a priori what the partition count should be, and I'd like
to avoid paying the shuffle cost if possible -- I think repartition to a
higher partition count forces a shuffle.

Is it feasible to rework this so the executor - driver transfer in
.toLocalIterator is a steady stream rather than a partition at a time?

Thanks!
Andrew


Re: [ml] Lost persistence for fold in crossvalidation.

2015-02-18 Thread Joseph Bradley
Now in JIRA form: https://issues.apache.org/jira/browse/SPARK-5844

On Tue, Feb 17, 2015 at 3:12 PM, Xiangrui Meng men...@gmail.com wrote:

 There are three different regParams defined in the grid and there are
 tree folds. For simplicity, we didn't split the dataset into three and
 reuse them, but do the split for each fold. Then we need to cache 3*3
 times. Note that the pipeline API is not yet optimized for
 performance. It would be nice to optimize its perforamnce in 1.4.
 -Xiangrui

 On Wed, Feb 11, 2015 at 11:13 AM, Peter Rudenko petro.rude...@gmail.com
 wrote:
  Hi i have a problem. Using spark 1.2 with Pipeline + GridSearch +
  LogisticRegression. I’ve reimplemented LogisticRegression.fit method and
  comment out instances.unpersist()
 
  |override  def  fit(dataset:SchemaRDD,
  paramMap:ParamMap):LogisticRegressionModel  = {
  println(sFitting dataset ${dataset.take(1000).toSeq.hashCode()} with
  ParamMap $paramMap.)
  transformSchema(dataset.schema, paramMap, logging =true)
  import  dataset.sqlContext._
  val  map  =  this.paramMap ++ paramMap
  val  instances  =  dataset.select(map(labelCol).attr,
  map(featuresCol).attr)
.map {
  case  Row(label:Double, features:Vector) =
LabeledPoint(label, features)
}
 
  if  (instances.getStorageLevel ==StorageLevel.NONE) {
println(Instances not persisted)
instances.persist(StorageLevel.MEMORY_AND_DISK)
  }
 
   val  lr  =  (new  LogisticRegressionWithLBFGS)
.setValidateData(false)
.setIntercept(true)
  lr.optimizer
.setRegParam(map(regParam))
.setNumIterations(map(maxIter))
  val  lrm  =  new  LogisticRegressionModel(this, map,
  lr.run(instances).weights)
  //instances.unpersist()
  // copy model params
  Params.inheritValues(map,this, lrm)
  lrm
}
  |
 
  CrossValidator feeds the same SchemaRDD for each parameter (same hash
 code),
  but somewhere cache being flushed. The memory is enough. Here’s the
 output:
 
  |Fitting dataset 2051470010 with ParamMap {
  DRLogisticRegression-f35ae4d3-regParam: 0.1
  }.
  Instances not persisted
  Fitting dataset 2051470010 with ParamMap {
  DRLogisticRegression-f35ae4d3-regParam: 0.01
  }.
  Instances not persisted
  Fitting dataset 2051470010 with ParamMap {
  DRLogisticRegression-f35ae4d3-regParam: 0.001
  }.
  Instances not persisted
  Fitting dataset 802615223 with ParamMap {
  DRLogisticRegression-f35ae4d3-regParam: 0.1
  }.
  Instances not persisted
  Fitting dataset 802615223 with ParamMap {
  DRLogisticRegression-f35ae4d3-regParam: 0.01
  }.
  Instances not persisted
  |
 
  I have 3 parameters in GridSearch and 3 folds for CrossValidation:
 
  |
  val  paramGrid  =  new  ParamGridBuilder()
.addGrid(model.regParam,Array(0.1,0.01,0.001))
.build()
 
  crossval.setEstimatorParamMaps(paramGrid)
  crossval.setNumFolds(3)
  |
 
  I assume that the data should be read and cached 3 times (1 to
  numFolds).combinations(2) and be independent from number of parameters.
 But
  i have 9 times data being read and cached.
 
  Thanks,
  Peter Rudenko
 
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote:
 But RDD.aggregate() has this code:

 // Clone the zero value since we will also be serializing it as part of
 tasks
 var jobResult = Utils.clone(zeroValue,
 sc.env.closureSerializer.newInstance())

 I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
 we just missed it and need to apply the change to aggregate()? It seems
 appropriate to target a fix for 1.3.0.

 -Matt Cheah
 From: Josh Rosen rosenvi...@gmail.com
 Date: Wednesday, February 18, 2015 at 6:12 AM
 To: Matt Cheah mch...@palantir.com
 Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim
 m...@palantir.com, Andrew Ash a...@palantir.com
 Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
 reasoning?

 It looks like this was fixed in
 https://issues.apache.org/jira/browse/SPARK-4743 /
 https://github.com/apache/spark/pull/3605.  Can you see whether that patch
 fixes this issue for you?



 On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:

 Hi everyone,

 I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
 before, since I assumed that every aggregation required a key. However, I
 realized I could do my analysis using JavaRDD’s aggregate() instead and not
 use a key.

 I have set spark.serializer to use Kryo. As a result, JavaRDD’s
 combineByKey requires that a “createCombiner” function is provided, and the
 return value from that function must be serializable using Kryo. When I
 switched to using rdd.aggregate I assumed that the zero value would also be
 strictly Kryo serialized, as it is a data item and not part of a closure or
 the aggregation functions. However, I got a serialization exception as the
 closure serializer (only valid serializer is the Java serializer) was used
 instead.

 I was wondering the following:

 What is the rationale for making the zero value be serialized using the
 closure serializer? This isn’t part of the closure, but is an initial data
 item.
 Would it make sense for us to perhaps write a version of rdd.aggregate()
 that takes a function as a parameter, that generates the zero value? This
 would be more intuitive to be serialized using the closure serializer.

 I believe aggregateByKey is also affected.

 Thanks,

 -Matt Cheah



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Reynold Xin
Yes, that's a bug and should be using the standard serializer.

On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen so...@cloudera.com wrote:

 That looks, at the least, inconsistent. As far as I know this should
 be changed so that the zero value is always cloned via the non-closure
 serializer. Any objection to that?

 On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah mch...@palantir.com wrote:
  But RDD.aggregate() has this code:
 
  // Clone the zero value since we will also be serializing it as part
 of
  tasks
  var jobResult = Utils.clone(zeroValue,
  sc.env.closureSerializer.newInstance())
 
  I do see the SparkEnv.get.serializer used in aggregateByKey however.
 Perhaps
  we just missed it and need to apply the change to aggregate()? It seems
  appropriate to target a fix for 1.3.0.
 
  -Matt Cheah
  From: Josh Rosen rosenvi...@gmail.com
  Date: Wednesday, February 18, 2015 at 6:12 AM
  To: Matt Cheah mch...@palantir.com
  Cc: dev@spark.apache.org dev@spark.apache.org, Mingyu Kim
  m...@palantir.com, Andrew Ash a...@palantir.com
  Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero
 value
  reasoning?
 
  It looks like this was fixed in
  https://issues.apache.org/jira/browse/SPARK-4743 /
  https://github.com/apache/spark/pull/3605.  Can you see whether that
 patch
  fixes this issue for you?
 
 
 
  On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah mch...@palantir.com wrote:
 
  Hi everyone,
 
  I was using JavaPairRDD’s combineByKey() to compute all of my
 aggregations
  before, since I assumed that every aggregation required a key. However,
 I
  realized I could do my analysis using JavaRDD’s aggregate() instead and
 not
  use a key.
 
  I have set spark.serializer to use Kryo. As a result, JavaRDD’s
  combineByKey requires that a “createCombiner” function is provided, and
 the
  return value from that function must be serializable using Kryo. When I
  switched to using rdd.aggregate I assumed that the zero value would
 also be
  strictly Kryo serialized, as it is a data item and not part of a
 closure or
  the aggregation functions. However, I got a serialization exception as
 the
  closure serializer (only valid serializer is the Java serializer) was
 used
  instead.
 
  I was wondering the following:
 
  What is the rationale for making the zero value be serialized using the
  closure serializer? This isn’t part of the closure, but is an initial
 data
  item.
  Would it make sense for us to perhaps write a version of rdd.aggregate()
  that takes a function as a parameter, that generates the zero value?
 This
  would be more intuitive to be serialized using the closure serializer.
 
  I believe aggregateByKey is also affected.
 
  Thanks,
 
  -Matt Cheah
 
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: Batch prediciton for ALS

2015-02-18 Thread Xiangrui Meng
Please create a JIRA for it and we should discuss the APIs first
before updating the code. -Xiangrui

On Tue, Feb 17, 2015 at 4:10 PM, Debasish Das debasish.da...@gmail.com wrote:
 It will be really help us if we merge it but I guess it is already diverged
 from the new ALS...I will also take a look at it again and try update with
 the new ALS...

 On Tue, Feb 17, 2015 at 3:22 PM, Xiangrui Meng men...@gmail.com wrote:

 It may be too late to merge it into 1.3. I'm going to make another
 pass on your PR today. -Xiangrui

 On Tue, Feb 10, 2015 at 8:01 AM, Debasish Das debasish.da...@gmail.com
 wrote:
  Hi,
 
  Will it be possible to merge this PR to 1.3 ?
 
  https://github.com/apache/spark/pull/3098
 
  The batch prediction API in ALS will be useful for us who want to cross
  validate on prec@k and MAP...
 
  Thanks.
  Deb



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
Hi everyone,

Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
consistently has a slower execution time on the later release. I was
wondering if anyone else has had similar observations.

I have two setups where this reproduces. The first is a local test. I
launched a spark cluster with 4 worker JVMs on my Mac, and launched a
Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
files, which ends up having 128 partitions, and a total of 8000 rows.
The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
numbers being in seconds:
1 items

Spark 1.0.2: 0.069281, 0.012261, 0.011083

Spark 1.1.1: 0.11577, 0.097636, 0.11321



4 items

Spark 1.0.2: 0.023751, 0.069365, 0.023603

Spark 1.1.1: 0.224287, 0.229651, 0.158431



10 items

Spark 1.0.2: 0.047019, 0.049056, 0.042568

Spark 1.1.1: 0.353277, 0.288965, 0.281751



40 items

Spark 1.0.2: 0.216048, 0.198049, 0.796037

Spark 1.1.1: 1.865622, 2.224424, 2.037672

This small test suite indicates a consistently reproducible performance
regression.



I also notice this on a larger scale test. The cluster used is on EC2:

ec2 instance type: m2.4xlarge
10 slaves, 1 master
ephemeral storage
70 cores, 50 GB/box
In this case, I have a 100GB dataset split into 78 files totally 350 million
items, and I take the first 50,000 items from the RDD. In this case, I have
tested this on different formats of the raw data.

With plaintext files:

Spark 1.0.2: 0.422s, 0.363s, 0.382s

Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s



With snappy-compressed Avro files:

Spark 1.0.2: 0.73s, 0.395s, 0.426s

Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

Again demonstrating a reproducible performance regression.

I was wondering if anyone else observed this regression, and if so, if
anyone would have any idea what could possibly have caused it between Spark
1.0.2 and Spark 1.1.1?

Thanks,

-Matt Cheah




smime.p7s
Description: S/MIME cryptographic signature


Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Sean Owen
On Wed, Feb 18, 2015 at 6:13 PM, Patrick Wendell pwend...@gmail.com wrote:
 Patrick this link gives a 404:
 https://people.apache.org/keys/committer/pwendell.asc

 Works for me. Maybe it's some ephemeral issue?

Yes works now; I swear it didn't before! that's all set now. The
signing key is in that file.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Patrick Wendell
I believe the heuristic governing the way that take() decides to fetch
partitions changed between these versions. It could be that in certain
cases the new heuristic is worse, but it might be good to just look at
the source code and see, for your number of elements taken and number
of partitions, if there was any effective change in how aggressively
spark fetched partitions.

This was quite a while ago, but I think the change was made because in
many cases the newer code works more efficiently.

- Patrick

On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,

 Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
 consistently has a slower execution time on the later release. I was
 wondering if anyone else has had similar observations.

 I have two setups where this reproduces. The first is a local test. I
 launched a spark cluster with 4 worker JVMs on my Mac, and launched a
 Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
 it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
 files, which ends up having 128 partitions, and a total of 8000 rows.
 The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
 numbers being in seconds:

 1 items

 Spark 1.0.2: 0.069281, 0.012261, 0.011083

 Spark 1.1.1: 0.11577, 0.097636, 0.11321


 4 items

 Spark 1.0.2: 0.023751, 0.069365, 0.023603

 Spark 1.1.1: 0.224287, 0.229651, 0.158431


 10 items

 Spark 1.0.2: 0.047019, 0.049056, 0.042568

 Spark 1.1.1: 0.353277, 0.288965, 0.281751


 40 items

 Spark 1.0.2: 0.216048, 0.198049, 0.796037

 Spark 1.1.1: 1.865622, 2.224424, 2.037672

 This small test suite indicates a consistently reproducible performance
 regression.


 I also notice this on a larger scale test. The cluster used is on EC2:

 ec2 instance type: m2.4xlarge
 10 slaves, 1 master
 ephemeral storage
 70 cores, 50 GB/box

 In this case, I have a 100GB dataset split into 78 files totally 350 million
 items, and I take the first 50,000 items from the RDD. In this case, I have
 tested this on different formats of the raw data.

 With plaintext files:

 Spark 1.0.2: 0.422s, 0.363s, 0.382s

 Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s


 With snappy-compressed Avro files:

 Spark 1.0.2: 0.73s, 0.395s, 0.426s

 Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

 Again demonstrating a reproducible performance regression.

 I was wondering if anyone else observed this regression, and if so, if
 anyone would have any idea what could possibly have caused it between Spark
 1.0.2 and Spark 1.1.1?

 Thanks,

 -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
I actually tested Spark 1.2.0 with the code in the rdd.take() method
swapped out for what was in Spark 1.0.2. The run time was still slower,
which indicates to me something at work lower in the stack.

-Matt Cheah

On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote:

I believe the heuristic governing the way that take() decides to fetch
partitions changed between these versions. It could be that in certain
cases the new heuristic is worse, but it might be good to just look at
the source code and see, for your number of elements taken and number
of partitions, if there was any effective change in how aggressively
spark fetched partitions.

This was quite a while ago, but I think the change was made because in
many cases the newer code works more efficiently.

- Patrick

On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
 Hi everyone,

 Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
 consistently has a slower execution time on the later release. I was
 wondering if anyone else has had similar observations.

 I have two setups where this reproduces. The first is a local test. I
 launched a spark cluster with 4 worker JVMs on my Mac, and launched a
 Spark-Shell. I retrieved the text file and immediately called
rdd.take(N) on
 it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
8
 files, which ends up having 128 partitions, and a total of 8000
rows.
 The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
all
 numbers being in seconds:

 1 items

 Spark 1.0.2: 0.069281, 0.012261, 0.011083

 Spark 1.1.1: 0.11577, 0.097636, 0.11321


 4 items

 Spark 1.0.2: 0.023751, 0.069365, 0.023603

 Spark 1.1.1: 0.224287, 0.229651, 0.158431


 10 items

 Spark 1.0.2: 0.047019, 0.049056, 0.042568

 Spark 1.1.1: 0.353277, 0.288965, 0.281751


 40 items

 Spark 1.0.2: 0.216048, 0.198049, 0.796037

 Spark 1.1.1: 1.865622, 2.224424, 2.037672

 This small test suite indicates a consistently reproducible performance
 regression.


 I also notice this on a larger scale test. The cluster used is on EC2:

 ec2 instance type: m2.4xlarge
 10 slaves, 1 master
 ephemeral storage
 70 cores, 50 GB/box

 In this case, I have a 100GB dataset split into 78 files totally 350
million
 items, and I take the first 50,000 items from the RDD. In this case, I
have
 tested this on different formats of the raw data.

 With plaintext files:

 Spark 1.0.2: 0.422s, 0.363s, 0.382s

 Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s


 With snappy-compressed Avro files:

 Spark 1.0.2: 0.73s, 0.395s, 0.426s

 Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

 Again demonstrating a reproducible performance regression.

 I was wondering if anyone else observed this regression, and if so, if
 anyone would have any idea what could possibly have caused it between
Spark
 1.0.2 and Spark 1.1.1?

 Thanks,

 -Matt Cheah


smime.p7s
Description: S/MIME cryptographic signature


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
Ah okay, I turned on spark.localExecution.enabled and the performance
returned to what Spark 1.0.2 had. However I can see how users can
inadvertently incur memory and network strain in fetching the whole
partition to the driver.

I¹ll evaluate on my side if we want to turn this on or not. Thanks for the
quick and accurate response!

-Matt CHeah

From:  Aaron Davidson ilike...@gmail.com
Date:  Wednesday, February 18, 2015 at 5:25 PM
To:  Matt Cheah mch...@palantir.com
Cc:  Patrick Wendell pwend...@gmail.com, dev@spark.apache.org
dev@spark.apache.org, Mingyu Kim m...@palantir.com, Sandor Van
Wassenhove sand...@palantir.com
Subject:  Re: [Performance] Possible regression in rdd.take()?

You might be seeing the result of this patch:

https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5
797

which was introduced in 1.1.1. This patch disabled the ability for take() to
run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can try
enabling local execution and seeing if your problem goes away.

On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote:
 I actually tested Spark 1.2.0 with the code in the rdd.take() method
 swapped out for what was in Spark 1.0.2. The run time was still slower,
 which indicates to me something at work lower in the stack.
 
 -Matt Cheah
 
 On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 I believe the heuristic governing the way that take() decides to fetch
 partitions changed between these versions. It could be that in certain
 cases the new heuristic is worse, but it might be good to just look at
 the source code and see, for your number of elements taken and number
 of partitions, if there was any effective change in how aggressively
 spark fetched partitions.
 
 This was quite a while ago, but I think the change was made because in
 many cases the newer code works more efficiently.
 
 - Patrick
 
 On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
  Hi everyone,
 
  Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
  consistently has a slower execution time on the later release. I was
  wondering if anyone else has had similar observations.
 
  I have two setups where this reproduces. The first is a local test. I
  launched a spark cluster with 4 worker JVMs on my Mac, and launched a
  Spark-Shell. I retrieved the text file and immediately called
 rdd.take(N) on
  it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
 8
  files, which ends up having 128 partitions, and a total of 8000
 rows.
  The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
 all
  numbers being in seconds:
 
  1 items
 
  Spark 1.0.2: 0.069281, 0.012261, 0.011083
 
  Spark 1.1.1: 0.11577, 0.097636, 0.11321
 
 
  4 items
 
  Spark 1.0.2: 0.023751, 0.069365, 0.023603
 
  Spark 1.1.1: 0.224287, 0.229651, 0.158431
 
 
  10 items
 
  Spark 1.0.2: 0.047019, 0.049056, 0.042568
 
  Spark 1.1.1: 0.353277, 0.288965, 0.281751
 
 
  40 items
 
  Spark 1.0.2: 0.216048, 0.198049, 0.796037
 
  Spark 1.1.1: 1.865622, 2.224424, 2.037672
 
  This small test suite indicates a consistently reproducible performance
  regression.
 
 
  I also notice this on a larger scale test. The cluster used is on EC2:
 
  ec2 instance type: m2.4xlarge
  10 slaves, 1 master
  ephemeral storage
  70 cores, 50 GB/box
 
  In this case, I have a 100GB dataset split into 78 files totally 350
 million
  items, and I take the first 50,000 items from the RDD. In this case, I
 have
  tested this on different formats of the raw data.
 
  With plaintext files:
 
  Spark 1.0.2: 0.422s, 0.363s, 0.382s
 
  Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
 
 
  With snappy-compressed Avro files:
 
  Spark 1.0.2: 0.73s, 0.395s, 0.426s
 
  Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
 
  Again demonstrating a reproducible performance regression.
 
  I was wondering if anyone else observed this regression, and if so, if
  anyone would have any idea what could possibly have caused it between
 Spark
  1.0.2 and Spark 1.1.1?
 
  Thanks,
 
  -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Krishna Sankar
+1 (non-binding, of course)

1. Compiled OSX 10.10 (Yosemite) OK Total time: 14:50 min
 mvn clean package -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.4
-Dhadoop.version=2.6.0 -Phive -DskipTests -Dscala-2.11
2. Tested pyspark, mlib - running as well as compare results with 1.1.x 
1.2.x
2.1. statistics (min,max,mean,Pearson,Spearman) OK
2.2. Linear/Ridge/Laso Regression OK

But MSE has increased from 40.81 to 105.86. Has some refactoring happened
on SGD/Linear Models ? Or do we have some extra parameters ? or change
of defaults ?

2.3. Decision Tree, Naive Bayes OK
2.4. KMeans OK
   Center And Scale OK
   WSSSE has come down slightly
2.5. rdd operations OK
  State of the Union Texts - MapReduce, Filter,sortByKey (word count)
2.6. Recommendation (Movielens medium dataset ~1 M ratings) OK
   Model evaluation/optimization (rank, numIter, lmbda) with itertools
OK
3. Scala - MLlib
3.1. statistics (min,max,mean,Pearson,Spearman) OK
3.2. LinearRegressionWIthSGD OK
3.3. Decision Tree OK
3.4. KMeans OK
3.5. Recommendation (Movielens medium dataset ~1 M ratings) OK

Cheers
k/
P.S: For some reason replacing  import sqlContext.createSchemaRDD with 
import sqlContext.implicits._ doesn't do the implicit conversations.
registerTempTable
gives syntax error. I will dig deeper tomorrow. Has anyone seen this ?

On Wed, Feb 18, 2015 at 3:25 PM, Sean Owen so...@cloudera.com wrote:

 On Wed, Feb 18, 2015 at 6:13 PM, Patrick Wendell pwend...@gmail.com
 wrote:
  Patrick this link gives a 404:
  https://people.apache.org/keys/committer/pwendell.asc
 
  Works for me. Maybe it's some ephemeral issue?

 Yes works now; I swear it didn't before! that's all set now. The
 signing key is in that file.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Aaron Davidson
You might be seeing the result of this patch:

https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797

which was introduced in 1.1.1. This patch disabled the ability for take()
to run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can
try enabling local execution and seeing if your problem goes away.

On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah mch...@palantir.com wrote:

 I actually tested Spark 1.2.0 with the code in the rdd.take() method
 swapped out for what was in Spark 1.0.2. The run time was still slower,
 which indicates to me something at work lower in the stack.

 -Matt Cheah

 On 2/18/15, 4:54 PM, Patrick Wendell pwend...@gmail.com wrote:

 I believe the heuristic governing the way that take() decides to fetch
 partitions changed between these versions. It could be that in certain
 cases the new heuristic is worse, but it might be good to just look at
 the source code and see, for your number of elements taken and number
 of partitions, if there was any effective change in how aggressively
 spark fetched partitions.
 
 This was quite a while ago, but I think the change was made because in
 many cases the newer code works more efficiently.
 
 - Patrick
 
 On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah mch...@palantir.com wrote:
  Hi everyone,
 
  Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
  consistently has a slower execution time on the later release. I was
  wondering if anyone else has had similar observations.
 
  I have two setups where this reproduces. The first is a local test. I
  launched a spark cluster with 4 worker JVMs on my Mac, and launched a
  Spark-Shell. I retrieved the text file and immediately called
 rdd.take(N) on
  it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
 8
  files, which ends up having 128 partitions, and a total of 8000
 rows.
  The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
 all
  numbers being in seconds:
 
  1 items
 
  Spark 1.0.2: 0.069281, 0.012261, 0.011083
 
  Spark 1.1.1: 0.11577, 0.097636, 0.11321
 
 
  4 items
 
  Spark 1.0.2: 0.023751, 0.069365, 0.023603
 
  Spark 1.1.1: 0.224287, 0.229651, 0.158431
 
 
  10 items
 
  Spark 1.0.2: 0.047019, 0.049056, 0.042568
 
  Spark 1.1.1: 0.353277, 0.288965, 0.281751
 
 
  40 items
 
  Spark 1.0.2: 0.216048, 0.198049, 0.796037
 
  Spark 1.1.1: 1.865622, 2.224424, 2.037672
 
  This small test suite indicates a consistently reproducible performance
  regression.
 
 
  I also notice this on a larger scale test. The cluster used is on EC2:
 
  ec2 instance type: m2.4xlarge
  10 slaves, 1 master
  ephemeral storage
  70 cores, 50 GB/box
 
  In this case, I have a 100GB dataset split into 78 files totally 350
 million
  items, and I take the first 50,000 items from the RDD. In this case, I
 have
  tested this on different formats of the raw data.
 
  With plaintext files:
 
  Spark 1.0.2: 0.422s, 0.363s, 0.382s
 
  Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
 
 
  With snappy-compressed Avro files:
 
  Spark 1.0.2: 0.73s, 0.395s, 0.426s
 
  Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
 
  Again demonstrating a reproducible performance regression.
 
  I was wondering if anyone else observed this regression, and if so, if
  anyone would have any idea what could possibly have caused it between
 Spark
  1.0.2 and Spark 1.1.1?
 
  Thanks,
 
  -Matt Cheah



Re: Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Mingyu Kim
Another alternative would be to compress the partition in memory in a
streaming fashion instead of calling .toArray on the iterator. Would it be
an easier mitigation to the problem? Or, is it hard to compress the rows
one by one without materializing the full partition in memory using the
compression algo Spark uses currently?

Mingyu





On 2/18/15, 1:01 PM, Imran Rashid iras...@cloudera.com wrote:

This would be pretty tricky to do -- the issue is that right now
sparkContext.runJob has you pass in a function from a partition to *one*
result object that gets serialized and sent back: Iterator[T] = U, and
that idea is baked pretty deep into a lot of the internals, DAGScheduler,
Task, Executors, etc.

Maybe another possibility worth considering: should we make it easy to go
from N partitions to 2N partitions (or any other multiple obviously)
without requiring a shuffle?  for that matter, you should also be able to
go from 2N to N without a shuffle as well.  That change is also somewhat
involved, though.

Both are in theory possible, but I imagine they'd need really compelling
use cases.

An alternative would be to write your RDD to some other data store (eg,
hdfs) which has better support for reading data in a streaming fashion,
though you would probably be unhappy with the overhead.



On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash and...@andrewash.com wrote:

 Hi Spark devs,

 I'm creating a streaming export functionality for RDDs and am having
some
 trouble with large partitions.  The RDD.toLocalIterator() call pulls
over a
 partition at a time to the driver, and then streams the RDD out from
that
 partition before pulling in the next one.  When you have large
partitions
 though, you can OOM the driver, especially when multiple of these
exports
 are happening in the same SparkContext.

 One idea I had was to repartition the RDD so partitions are smaller, but
 it's hard to know a priori what the partition count should be, and I'd
like
 to avoid paying the shuffle cost if possible -- I think repartition to a
 higher partition count forces a shuffle.

 Is it feasible to rework this so the executor - driver transfer in
 .toLocalIterator is a steady stream rather than a partition at a time?

 Thanks!
 Andrew



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Spark-SQL 1.2.0 sort by results are not consistent with Hive

2015-02-18 Thread Kannan Rajah
According to hive documentation, sort by is supposed to order the results
for each reducer. So if we set a single reducer, then the results should be
sorted, right? But this is not happening. Any idea why? Looks like the
settings I am using to restrict the number of reducers is not having an
effect.

*Tried the following:*

Set spark.default.parallelism to 1

Set spark.sql.shuffle.partitions to 1

These were set in hive-site.xml and also inside spark shell.


*Spark-SQL*

create table if not exists testSortBy (key int, name string, age int);
LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE
testSortBy;
select * from testSortBY;

1Aditya28
2aash25
3prashanth27
4bharath26
5terry27
6nanda26
7pradeep27
8pratyay26


set spark.default.parallelism=1;

set spark.sql.shuffle.partitions=1;

select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth
27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age
from testSortBy sort by age;

aash25
bharath26
nanda26
pratyay26
prashanth27
terry27
pradeep27
Aditya28


--
Kannan