Maximum size of vector that reduce can handle

2015-01-23 Thread Ulanov, Alexander
Dear Spark developers,

I am trying to measure the Spark reduce performance for big vectors. My 
motivation is related to machine learning gradient. Gradient is a vector that 
is computed on each worker and then all results need to be summed up and 
broadcasted back to workers. For example, present machine learning applications 
involve very long parameter vectors, for deep neural networks it can be up to 
2Billions. So, I want to measure the time that is needed for this operation 
depending on the size of vector and number of workers. I wrote few lines of 
code that assume that Spark will distribute partitions among all available 
workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 
Workers.

import org.apache.spark.mllib.rdd.RDDFunctions._
import breeze.linalg._
import org.apache.log4j._
Logger.getRootLogger.setLevel(Level.OFF)
val n = 6000
val p = 12
val vv = sc.parallelize(0 until p, p).map(i = DenseVector.rand[Double]( n ))
vv.reduce(_ + _)

When executing in shell with 60M vector it crashes after some period of time. 
One of the node contains the following in stdout:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
os::commit_memory(0x00075550, 2863661056, 0) failed; error='Cannot 
allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 2863661056 bytes for 
committing reserved memory.

I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M 
vector of Double should not be a problem. Are there any big overheads for this? 
What is the maximum size of vector that reduce can handle? 

Best regards, Alexander

P.S. 

spark.driver.maxResultSize 0 needs to set in order to run this code. I also 
needed to change java.io.tmpdir and spark.local.dir folders because my /tmp 
folder which is default, was too small and Spark swaps heavily into this 
folder. Without these settings I get either no space left on device or out 
of memory exceptions.

I also submitted a bug https://issues.apache.org/jira/browse/SPARK-5386

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: query planner design doc?

2015-01-23 Thread Nicholas Murphy
Okay, thanks.  The design document mostly details the infrastructure for 
optimization strategies but doesn’t detail the strategies themselves.  I take 
it the set of strategies are basically embodied in SparkStrategies.scala...is 
there a design doc/roadmap/JIRA issue detailing what strategies exist and which 
are planned?

Thanks,
Nick

 On Jan 22, 2015, at 7:45 PM, Michael Armbrust mich...@databricks.com wrote:
 
 Here is the initial design document for catalyst :
 https://docs.google.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit
  
 https://docs.google.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit
 
 Strategies (many of which are in SparkStragegies.scala) are the part that 
 creates the physical operators from a catalyst logical plan.  These operators 
 have execute() methods that actually call RDD operations.
 
 On Thu, Jan 22, 2015 at 3:19 PM, Nicholas Murphy halcyo...@gmail.com 
 mailto:halcyo...@gmail.com wrote:
 Hi-
 
 Quick question: is there a design doc (or something more than “look at the 
 code”) for the query planner for Spark SQL (i.e., the component that 
 takes…Catalyst?…operator trees and translates them into SPARK operations)?
 
 Thanks,
 Nick
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
 mailto:dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org 
 mailto:dev-h...@spark.apache.org
 
 



Re: Optimize encoding/decoding strings when using Parquet

2015-01-23 Thread Michael Davies
Added PR https://github.com/apache/spark/pull/4139 
https://github.com/apache/spark/pull/4139 - I think tests have been 
re-arranged so merge necessary

Mick


 On 19 Jan 2015, at 18:31, Reynold Xin r...@databricks.com wrote:
 
 Definitely go for a pull request!
 
 
 On Mon, Jan 19, 2015 at 10:10 AM, Mick Davies michael.belldav...@gmail.com 
 mailto:michael.belldav...@gmail.com wrote:
 
 Looking at Parquet code - it looks like hooks are already in place to
 support this.
 
 In particular PrimitiveConverter has methods hasDictionarySupport and
 addValueFromDictionary for this purpose. These are not used by
 CatalystPrimitiveConverter.
 
 I think that it would be pretty straightforward to add this. Has anyone
 considered this? Shall I get a pull request  together for it.
 
 Mick
 
 
 
 --
 View this message in context: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/Optimize-encoding-decoding-strings-when-using-Parquet-tp10141p10195.html
  
 http://apache-spark-developers-list.1001551.n3.nabble.com/Optimize-encoding-decoding-strings-when-using-Parquet-tp10141p10195.html
 Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
 mailto:dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org 
 mailto:dev-h...@spark.apache.org
 
 



RE: Maximum size of vector that reduce can handle

2015-01-23 Thread Ulanov, Alexander
Hi DB Tsai,

Thank you for your suggestion. Actually, I've started my experiments with 
treeReduce. Originally, I had vv.treeReduce(_ + _, 2) in my script exactly 
because MLlib optimizers are using it, as you pointed out with LBFGS. However, 
it leads to the same problems as reduce, but presumably not so directly. As 
far as I understand, treeReduce limits the number of communications between 
workers and master forcing workers to partially compute the reduce operation.

Are you sure that driver will first collect all results (or all partial results 
in treeReduce) and ONLY then perform aggregation? If that is the problem, then 
how to force it to do aggregation after receiving each portion of data from 
Workers?

Best regards, Alexander

-Original Message-
From: DB Tsai [mailto:dbt...@dbtsai.com] 
Sent: Friday, January 23, 2015 11:53 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Maximum size of vector that reduce can handle

Hi Alexander,

When you use `reduce` to aggregate the vectors, those will actually be pulled 
into driver, and merged over there. Obviously, it's not scaleable given you are 
doing deep neural networks which have so many coefficients.

Please try treeReduce instead which is what we do in linear regression and 
logistic regression.

See 
https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
for example.

val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( seqOp 
= (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = 
localGradient.compute( features, label, bcW.value, grad) (grad, loss + l) }, 
combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = 
axpy(1.0, grad2, grad1) (grad1, loss1 + loss2)
})

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander alexander.ula...@hp.com 
wrote:
 Dear Spark developers,

 I am trying to measure the Spark reduce performance for big vectors. My 
 motivation is related to machine learning gradient. Gradient is a vector that 
 is computed on each worker and then all results need to be summed up and 
 broadcasted back to workers. For example, present machine learning 
 applications involve very long parameter vectors, for deep neural networks it 
 can be up to 2Billions. So, I want to measure the time that is needed for 
 this operation depending on the size of vector and number of workers. I wrote 
 few lines of code that assume that Spark will distribute partitions among all 
 available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), 
 each runs 2 Workers.

 import org.apache.spark.mllib.rdd.RDDFunctions._
 import breeze.linalg._
 import org.apache.log4j._
 Logger.getRootLogger.setLevel(Level.OFF)
 val n = 6000
 val p = 12
 val vv = sc.parallelize(0 until p, p).map(i = 
 DenseVector.rand[Double]( n )) vv.reduce(_ + _)

 When executing in shell with 60M vector it crashes after some period of time. 
 One of the node contains the following in stdout:
 Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
 os::commit_memory(0x00075550, 2863661056, 0) failed; 
 error='Cannot allocate memory' (errno=12) # # There is insufficient memory 
 for the Java Runtime Environment to continue.
 # Native memory allocation (malloc) failed to allocate 2863661056 bytes for 
 committing reserved memory.

 I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M 
 vector of Double should not be a problem. Are there any big overheads for 
 this? What is the maximum size of vector that reduce can handle?

 Best regards, Alexander

 P.S.

 spark.driver.maxResultSize 0 needs to set in order to run this code. I also 
 needed to change java.io.tmpdir and spark.local.dir folders because my 
 /tmp folder which is default, was too small and Spark swaps heavily into this 
 folder. Without these settings I get either no space left on device or out 
 of memory exceptions.

 I also submitted a bug 
 https://issues.apache.org/jira/browse/SPARK-5386

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For 
 additional commands, e-mail: dev-h...@spark.apache.org


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: query planner design doc?

2015-01-23 Thread Michael Armbrust
No, are you looking for something in particular?

On Fri, Jan 23, 2015 at 9:44 AM, Nicholas Murphy halcyo...@gmail.com
wrote:

 Okay, thanks.  The design document mostly details the infrastructure for
 optimization strategies but doesn’t detail the strategies themselves.  I
 take it the set of strategies are basically embodied in
 SparkStrategies.scala...is there a design doc/roadmap/JIRA issue detailing
 what strategies exist and which are planned?

 Thanks,
 Nick

 On Jan 22, 2015, at 7:45 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Here is the initial design document for catalyst :

 https://docs.google.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit

 Strategies (many of which are in SparkStragegies.scala) are the part that
 creates the physical operators from a catalyst logical plan.  These
 operators have execute() methods that actually call RDD operations.

 On Thu, Jan 22, 2015 at 3:19 PM, Nicholas Murphy halcyo...@gmail.com
 wrote:

 Hi-

 Quick question: is there a design doc (or something more than “look at
 the code”) for the query planner for Spark SQL (i.e., the component that
 takes…Catalyst?…operator trees and translates them into SPARK operations)?

 Thanks,
 Nick
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org






Re: Spark 1.2.0: MissingRequirementError

2015-01-23 Thread Peter Prettenhofer
much appreciated if somebody could help fixing this issue -- or at least
give me some hints what might be wrong

thanks,
 Peter

2015-01-15 14:04 GMT+01:00 PierreB pierre.borckm...@realimpactanalytics.com
:

 Hi guys,

 A few people seem to have the same problem with Spark 1.2.0 so I figured I
 would push it here.

 see:

 http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-td21149.html

 In a nutshell, for sbt test to work, we now need to fork a JVM and also
 give
 more memory to be able to run tests.

 See
 also:
 https://github.com/deanwampler/spark-workshop/blob/master/project/Build.scala

 This all used to work fine until 1.2.0.

 Could u have a look please?
 Thanks

 P.




 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-2-0-MissingRequirementError-tp10123.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




-- 
Peter Prettenhofer


Re: Maximum size of vector that reduce can handle

2015-01-23 Thread DB Tsai
Hi Alexander,

For `reduce`, it's an action that will collect all the data from
mapper to driver, and perform the aggregation in driver. As a result,
if the output from the mapper is very large, and the numbers of
partitions in mapper are large, it might cause a problem.

For `treeReduce`, as the name indicates, the way it works is in the
first layer, it aggregates the output of the mappers two by two
resulting half of the numbers of output. And then, we continuously do
the aggregation layer by layer. The final aggregation will be done in
driver but in this time, the numbers of data are small.

By default, depth 2 is used, so if you have so many partitions of
large vector, this may still cause issue. You can increase the depth
into higher numbers such that in the final reduce in driver, the
number of partitions are very small.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander
alexander.ula...@hp.com wrote:
 Hi DB Tsai,

 Thank you for your suggestion. Actually, I've started my experiments with 
 treeReduce. Originally, I had vv.treeReduce(_ + _, 2) in my script 
 exactly because MLlib optimizers are using it, as you pointed out with LBFGS. 
 However, it leads to the same problems as reduce, but presumably not so 
 directly. As far as I understand, treeReduce limits the number of 
 communications between workers and master forcing workers to partially 
 compute the reduce operation.

 Are you sure that driver will first collect all results (or all partial 
 results in treeReduce) and ONLY then perform aggregation? If that is the 
 problem, then how to force it to do aggregation after receiving each portion 
 of data from Workers?

 Best regards, Alexander

 -Original Message-
 From: DB Tsai [mailto:dbt...@dbtsai.com]
 Sent: Friday, January 23, 2015 11:53 AM
 To: Ulanov, Alexander
 Cc: dev@spark.apache.org
 Subject: Re: Maximum size of vector that reduce can handle

 Hi Alexander,

 When you use `reduce` to aggregate the vectors, those will actually be pulled 
 into driver, and merged over there. Obviously, it's not scaleable given you 
 are doing deep neural networks which have so many coefficients.

 Please try treeReduce instead which is what we do in linear regression and 
 logistic regression.

 See 
 https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
 for example.

 val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( 
 seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = 
 val l = localGradient.compute( features, label, bcW.value, grad) (grad, loss 
 + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, 
 loss2)) = axpy(1.0, grad2, grad1) (grad1, loss1 + loss2)
 })

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai



 On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander alexander.ula...@hp.com 
 wrote:
 Dear Spark developers,

 I am trying to measure the Spark reduce performance for big vectors. My 
 motivation is related to machine learning gradient. Gradient is a vector 
 that is computed on each worker and then all results need to be summed up 
 and broadcasted back to workers. For example, present machine learning 
 applications involve very long parameter vectors, for deep neural networks 
 it can be up to 2Billions. So, I want to measure the time that is needed for 
 this operation depending on the size of vector and number of workers. I 
 wrote few lines of code that assume that Spark will distribute partitions 
 among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 
 16GB RAM), each runs 2 Workers.

 import org.apache.spark.mllib.rdd.RDDFunctions._
 import breeze.linalg._
 import org.apache.log4j._
 Logger.getRootLogger.setLevel(Level.OFF)
 val n = 6000
 val p = 12
 val vv = sc.parallelize(0 until p, p).map(i =
 DenseVector.rand[Double]( n )) vv.reduce(_ + _)

 When executing in shell with 60M vector it crashes after some period of 
 time. One of the node contains the following in stdout:
 Java HotSpot(TM) 64-Bit Server VM warning: INFO:
 os::commit_memory(0x00075550, 2863661056, 0) failed;
 error='Cannot allocate memory' (errno=12) # # There is insufficient memory 
 for the Java Runtime Environment to continue.
 # Native memory allocation (malloc) failed to allocate 2863661056 bytes for 
 committing reserved memory.

 I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M 
 vector of Double should not be a problem. Are there any big overheads for 
 this? What is the maximum size of vector that reduce can handle?

 Best regards, Alexander

 P.S.

 spark.driver.maxResultSize 0 needs to set in order to run this code. I 
 also needed to change 

Re: Spark performance gains for small queries

2015-01-23 Thread Saumitra Shahapure (Vizury)
Hey Matei,

Thanks for your reply. We would keep in mind to use JDBC server for smaller
queries.

For the mapreduce job start-up, are you pointing towards JVM initialization
latencies in MR? Other than JVM initialization, does Spark do any
optimization (that is not done by mapreduce) to speed up the startup?

--
Regards,
Saumitra Shahapure

On Fri, Jan 23, 2015 at 2:08 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 It's hard to tell without more details, but the start-up latency in Hive
 can sometimes be high, especially if you are running Hive on MapReduce. MR
 just takes 20-30 seconds per job to spin up even if the job is doing
 nothing.

 For real use of Spark SQL for short queries by the way, I'd recommend
 using the JDBC server so that you can have a long-running Spark process. It
 gets quite a bit faster after the first few queries.

 Matei

  On Jan 22, 2015, at 10:22 PM, Saumitra Shahapure (Vizury) 
 saumitra.shahap...@vizury.com wrote:
 
  Hello,
 
  We were comparing performance of some of our production hive queries
  between Hive and Spark. We compared Hive(0.13)+hadoop (1.2.1) against
 both
  Spark 0.9 and 1.1. We could see that the performance gains have been good
  in Spark.
 
  We tried a very simple query,
  select count(*) from T where col3=123
  in both sparkSQL and Hive (with hive.map.aggr=true) and found that Spark
  performance had been 2x better than Hive (120sec vs 60sec). Table T is
  stored in S3 and contains 600MB single GZIP file.
 
  My question is, why Spark is faster than Hive here? In both of the cases,
  the file will be downloaded, uncompressed and lines will be counted by a
  single process. For Hive case, reducer will be identity function
  since hive.map.aggr is true.
 
  Note that disk spills and network I/O are very less for Hive's case as
 well,
  --
  Regards,
  Saumitra Shahapure




Find the two storage Locations of each partition of a replicated rdd.

2015-01-23 Thread Rapelly Kartheek
hi,

I wanna find the storage locations( BlockManagerIds) of each partition when
the rdd is replicated twice. I mean, If a twice replicated rdd has got 5
partitions, I would like to know the first and second storage locations of
each partition. Basically, I am trying to modify the list of nodes selected
for replicating an rdd.

I just want to checkout where exactly does the first and second copies of
each partition gets stored. I tried upon the rdd storage details in the
webUI, but couldn't gain much.

Any help please!!

Thank you
Karthik


Re: Spark performance gains for small queries

2015-01-23 Thread Matei Zaharia
It's hard to tell without more details, but the start-up latency in Hive can 
sometimes be high, especially if you are running Hive on MapReduce. MR just 
takes 20-30 seconds per job to spin up even if the job is doing nothing.

For real use of Spark SQL for short queries by the way, I'd recommend using the 
JDBC server so that you can have a long-running Spark process. It gets quite a 
bit faster after the first few queries.

Matei

 On Jan 22, 2015, at 10:22 PM, Saumitra Shahapure (Vizury) 
 saumitra.shahap...@vizury.com wrote:
 
 Hello,
 
 We were comparing performance of some of our production hive queries
 between Hive and Spark. We compared Hive(0.13)+hadoop (1.2.1) against both
 Spark 0.9 and 1.1. We could see that the performance gains have been good
 in Spark.
 
 We tried a very simple query,
 select count(*) from T where col3=123
 in both sparkSQL and Hive (with hive.map.aggr=true) and found that Spark
 performance had been 2x better than Hive (120sec vs 60sec). Table T is
 stored in S3 and contains 600MB single GZIP file.
 
 My question is, why Spark is faster than Hive here? In both of the cases,
 the file will be downloaded, uncompressed and lines will be counted by a
 single process. For Hive case, reducer will be identity function
 since hive.map.aggr is true.
 
 Note that disk spills and network I/O are very less for Hive's case as well,
 --
 Regards,
 Saumitra Shahapure


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: spark 1.1.0 (w/ hadoop 2.4) vs aws java sdk 1.7.2

2015-01-23 Thread Sean Owen
Did you use spark.files.userClassPathFirst = true? it's exactly for
this kind of problem.

On Fri, Jan 23, 2015 at 4:42 AM, William-Smith
williamsmith.m...@gmail.com wrote:
 I have had the same issue while using HttpClient from AWS EMR Spark Streaming
 to post to a nodejs server.

 I have found ... using
 Classloder.getResource('org/apache/http/client/HttpClient)  that the
 class
 Is being loaded front the spark-assembly-1.1.0-hadoop2.4.0.jar.

 That in itself is not the issue because the version is 4.2.5  the same
 version I am using on my local machine with success  using Hadoop cdh 5.



 The issue is that HttpClient relies on Httpcore  and there is an old
 commons-httpcore-1.3.jar as well as httpcore-4.5.2 in the spark-assembly
 jar.

 It looks like the old one is getting loaded first.

 So the fix might be to build the Spark jar myself without the httpcore-1.3
 and replace it on bootstrap.
 I will keep you posted on the outcome.





 --
 View this message in context: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/spark-1-1-0-w-hadoop-2-4-vs-aws-java-sdk-1-7-2-tp8481p10250.html
 Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org