Re: Super slow caching in 1.3?

2015-04-27 Thread Christian Perez
Michael,

There is only one schema: both versions have 200 string columns in one file.

On Mon, Apr 20, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote:
 Now this is very important:



 “Normal RDDs” refers to “batch RDDs”. However the default in-memory
 Serialization of RDDs which are part of DSTream is “Srialized” rather than
 actual (hydrated) Objects. The Spark documentation states that
 “Serialization” is required for space and garbage collection efficiency (but
 creates higher CPU load) – which makes sense consider the large number of
 RDDs which get discarded in a streaming app



 So what does Data Bricks actually recommend as Object Oriented model for RDD
 elements used in Spark Streaming apps – flat or not and can you provide a
 detailed description / spec of both



 From: Michael Armbrust [mailto:mich...@databricks.com]
 Sent: Thursday, April 16, 2015 7:23 PM
 To: Evo Eftimov
 Cc: Christian Perez; user


 Subject: Re: Super slow caching in 1.3?



 Here are the types that we specialize, other types will be much slower.
 This is only for Spark SQL, normal RDDs do not serialize data that is
 cached.  I'll also not that until yesterday we were missing FloatType

 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154



 Christian, can you provide the schema of the fast and slow datasets?



 On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Michael what exactly do you mean by flattened version/structure here e.g.:

 1. An Object with only primitive data types as attributes
 2. An Object with  no more than one level of other Objects as attributes
 3. An Array/List of primitive types
 4. An Array/List of Objects

 This question is in general about RDDs not necessarily RDDs in the context
 of SparkSQL

 When answering can you also score how bad the performance of each of the
 above options is


 -Original Message-
 From: Christian Perez [mailto:christ...@svds.com]
 Sent: Thursday, April 16, 2015 6:09 PM
 To: Michael Armbrust
 Cc: user
 Subject: Re: Super slow caching in 1.3?

 Hi Michael,

 Good question! We checked 1.2 and found that it is also slow cacheing the
 same flat parquet file. Caching other file formats of the same data were
 faster by up to a factor of ~2. Note that the parquet file was created in
 Impala but the other formats were written by Spark SQL.

 Cheers,

 Christian

 On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com
 wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you
 caching nested data or flat rows?  The in-memory caching is not really
 designed for nested data and so performs pretty slowly here (its just
 falling back to kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org





 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark where do third parties libraries need to be installed under Yarn-client mode

2015-04-24 Thread Christian Perez
To run MLlib, you only need numpy on each node. For additional
dependencies, you can call the spark-submit with --py-files option and
add the .zip or .egg.

https://spark.apache.org/docs/latest/submitting-applications.html

Cheers,

Christian

On Fri, Apr 24, 2015 at 1:56 AM, Hoai-Thu Vuong thuv...@gmail.com wrote:
 I use sudo pip install ... for each machine in cluster. And don't think how
 submit library

 On Fri, Apr 24, 2015 at 4:21 AM dusts66 dustin.davids...@gmail.com wrote:

 I am trying to figure out python library management.  So my question is:
 Where do third party Python libraries(ex. numpy, scipy, etc.) need to
 exist
 if I running a spark job via 'spark-submit' against my cluster in 'yarn
 client' mode.  Do the libraries need to only exist on the client(ie. the
 server executing the driver code) or do the libraries need to exist on the
 datanode/worker nodes where the tasks are executed?  The documentation
 seems
 to indicate that under 'yarn client' the libraries are only need on the
 client machine not the entire cluster.  If the libraries are needed across
 all cluster machines, any suggestions on a deployment strategy or
 dependency
 management model that works well?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Super slow caching in 1.3?

2015-04-16 Thread Christian Perez
Hi Michael,

Good question! We checked 1.2 and found that it is also slow cacheing
the same flat parquet file. Caching other file formats of the same
data were faster by up to a factor of ~2. Note that the parquet file
was created in Impala but the other formats were written by Spark SQL.

Cheers,

Christian

On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you caching
 nested data or flat rows?  The in-memory caching is not really designed for
 nested data and so performs pretty slowly here (its just falling back to
 kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Super slow caching in 1.3?

2015-04-06 Thread Christian Perez
Hi all,

Has anyone else noticed very slow time to cache a Parquet file? It
takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
on M2 EC2 instances. Or are my expectations way off...

Cheers,

Christian

-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: persist(MEMORY_ONLY) takes lot of time

2015-04-02 Thread Christian Perez
+1.

Caching is way too slow.

On Wed, Apr 1, 2015 at 12:33 PM, SamyaMaiti samya.maiti2...@gmail.com wrote:
 Hi Experts,

 I have a parquet dataset of 550 MB ( 9 Blocks) in HDFS. I want to run SQL
 queries repetitively.

 Few questions :

 1. When I do the below (persist to memory after reading from disk), it takes
 lot of time to persist to memory, any suggestions of how to tune this?

  val inputP  = sqlContext.parquetFile(some HDFS path)
  inputP.registerTempTable(sample_table)
  inputP.persist(MEMORY_ONLY)
  val result = sqlContext.sql(some sql query)
  result.count

 Note : Once the data is persisted to memory, it takes fraction of seconds to
 return query result from the second query onwards. So my concern is how to
 reduce the time when the data is first loaded to cache.


 2. I have observed that if I omit the below line,
  inputP.persist(MEMORY_ONLY)
   the first time Query execution is comparatively quick (say it take
 1min), as the load to Memory time is saved, but to my surprise the second
 time I run the same query it takes 30 sec as the inputP is not constructed
 from disk (checked from UI).

  So my question is, Does spark use some kind of internal caching for inputP
 in this scenario?

 Thanks in advance

 Regards,
 Sam



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/persist-MEMORY-ONLY-takes-lot-of-time-tp22343.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: input size too large | Performance issues with Spark

2015-04-02 Thread Christian Perez
To Akhil's point, see Tuning Data structures. Avoid standard collection hashmap.

With fewer machines, try running 4 or 5 cores per executor and only
3-4 executors (1 per node):
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/.
Ought to reduce shuffle performance hit (someone else confirm?)

#7 see default.shuffle.partitions (default: 200)

On Sun, Mar 29, 2015 at 7:57 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 Go through this once, if you haven't read it already.
 https://spark.apache.org/docs/latest/tuning.html

 Thanks
 Best Regards

 On Sat, Mar 28, 2015 at 7:33 PM, nsareen nsar...@gmail.com wrote:

 Hi All,

 I'm facing performance issues with spark implementation, and was briefly
 investigating on WebUI logs, i noticed that my RDD size is 55GB  the
 Shuffle Write is 10 GB  Input Size is 200GB. Application is a web
 application which does predictive analytics, so we keep most of our data
 in
 memory. This observation was only for 30mins usage of the application on a
 single user. We anticipate atleast 10-15 users of the application sending
 requests in parallel, which makes me a bit nervous.

 One constraint we have is that we do not have too many nodes in a cluster,
 we may end up with 3-4 machines at best, but they can be scaled up
 vertically each having 24 cores / 512 GB ram etc. which can allow us to
 make
 a virtual 10-15 node cluster.

 Even then the input size  shuffle write is too high for my liking. Any
 suggestions in this regard will be greatly appreciated as there aren't
 much
 resource on the net for handling performance issues such as these.

 Some pointers on my application's data structures  design

 1) RDD is a JavaPairRDD with Key / Value as CustomPOJO containing 3-4
 Hashmaps  Value containing 1 Hashmap
 2) Data is loaded via JDBCRDD during application startup, which also tends
 to take a lot of time, since we massage the data once it is fetched from
 DB
 and then save it as JavaPairRDD.
 3) Most of the data is structured, but we are still using JavaPairRDD,
 have
 not explored the option of Spark SQL though.
 4) We have only one SparkContext which caters to all the requests coming
 into the application from various users.
 5) During a single user session user can send 3-4 parallel stages
 consisting
 of Map / Group By / Join / Reduce etc.
 6) We have to change the RDD structure using different types of group by
 operations since the user can do drill down drill up of the data (
 aggregation at a higher / lower level). This is where we make use of
 Groupby's but there is a cost associated with this.
 7) We have observed, that the initial RDD's we create have 40 odd
 partitions, but post some stage executions like groupby's the partitions
 increase to 200 or so, this was odd, and we havn't figured out why this
 happens.

 In summary we wan to use Spark to provide us the capability to process our
 in-memory data structure very fast as well as scale to a larger volume
 when
 required in the future.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/input-size-too-large-Performance-issues-with-Spark-tp22270.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-20 Thread Christian Perez
Any other users interested in a feature
DataFrame.saveAsExternalTable() for making _useful_ external tables in
Hive, or am I the only one? Bueller? If I start a PR for this, will it
be taken seriously?

On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez christ...@svds.com wrote:
 Hi Yin,

 Thanks for the clarification. My first reaction is that if this is the
 intended behavior, it is a wasted opportunity. Why create a managed
 table in Hive that cannot be read from inside Hive? I think I
 understand now that you are essentially piggybacking on Hive's
 metastore to persist table info between/across sessions, but I imagine
 others might expect more (as I have.)

 We find ourselves wanting to do work in Spark and persist the results
 where other users (e.g. analysts using Tableau connected to
 Hive/Impala) can explore it. I imagine this is very common. I can, of
 course, save it as parquet and create an external table in hive (which
 I will do now), but saveAsTable seems much less useful to me now.

 Any other opinions?

 Cheers,

 C

 On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote:
 I meant table properties and serde properties are used to store metadata of
 a Spark SQL data source table. We do not set other fields like SerDe lib.
 For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
 should not show unrelated stuff like Serde lib and InputFormat. I have
 created https://issues.apache.org/jira/browse/SPARK-6413 to track the
 improvement on the output of DESCRIBE statement.

 On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote:

 Hi Christian,

 Your table is stored correctly in Parquet format.

 For saveAsTable, the table created is not a Hive table, but a Spark SQL
 data source table
 (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
 We are only using Hive's metastore to store the metadata (to be specific,
 only table properties and serde properties). When you look at table
 property, there will be a field called spark.sql.sources.provider and the
 value will be org.apache.spark.sql.parquet.DefaultSource. You can also
 look at your files in the file system. They are stored by Parquet.

 Thanks,

 Yin

 On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
 CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
 schema _and_ storage format in the Hive metastore, so that the table
 cannot be read from inside Hive. Spark itself can read the table, but
 Hive throws a Serialization error because it doesn't know it is
 Parquet.

 val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education,
 income)
 df.saveAsTable(spark_test_foo)

 Expected:

 COLUMNS(
   education BIGINT,
   income BIGINT
 )

 SerDe Library:
 org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
 InputFormat:
 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

 Actual:

 COLUMNS(
   col arraystring COMMENT from deserializer
 )

 SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
 InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

 ---

 Manually changing schema and storage restores access in Hive and
 doesn't affect Spark. Note also that Hive's table property
 spark.sql.sources.schema is correct. At first glance, it looks like
 the schema data is serialized when sent to Hive but not deserialized
 properly on receive.

 I'm tracing execution through source code... but before I get any
 deeper, can anyone reproduce this behavior?

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsTable broken in v1.3 DataFrames?

2015-03-19 Thread Christian Perez
Hi all,

DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
schema _and_ storage format in the Hive metastore, so that the table
cannot be read from inside Hive. Spark itself can read the table, but
Hive throws a Serialization error because it doesn't know it is
Parquet.

val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education, income)
df.saveAsTable(spark_test_foo)

Expected:

COLUMNS(
  education BIGINT,
  income BIGINT
)

SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

Actual:

COLUMNS(
  col arraystring COMMENT from deserializer
)

SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

---

Manually changing schema and storage restores access in Hive and
doesn't affect Spark. Note also that Hive's table property
spark.sql.sources.schema is correct. At first glance, it looks like
the schema data is serialized when sent to Hive but not deserialized
properly on receive.

I'm tracing execution through source code... but before I get any
deeper, can anyone reproduce this behavior?

Cheers,

Christian

-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTable broken in v1.3 DataFrames?

2015-03-19 Thread Christian Perez
Hi Yin,

Thanks for the clarification. My first reaction is that if this is the
intended behavior, it is a wasted opportunity. Why create a managed
table in Hive that cannot be read from inside Hive? I think I
understand now that you are essentially piggybacking on Hive's
metastore to persist table info between/across sessions, but I imagine
others might expect more (as I have.)

We find ourselves wanting to do work in Spark and persist the results
where other users (e.g. analysts using Tableau connected to
Hive/Impala) can explore it. I imagine this is very common. I can, of
course, save it as parquet and create an external table in hive (which
I will do now), but saveAsTable seems much less useful to me now.

Any other opinions?

Cheers,

C

On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote:
 I meant table properties and serde properties are used to store metadata of
 a Spark SQL data source table. We do not set other fields like SerDe lib.
 For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
 should not show unrelated stuff like Serde lib and InputFormat. I have
 created https://issues.apache.org/jira/browse/SPARK-6413 to track the
 improvement on the output of DESCRIBE statement.

 On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote:

 Hi Christian,

 Your table is stored correctly in Parquet format.

 For saveAsTable, the table created is not a Hive table, but a Spark SQL
 data source table
 (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
 We are only using Hive's metastore to store the metadata (to be specific,
 only table properties and serde properties). When you look at table
 property, there will be a field called spark.sql.sources.provider and the
 value will be org.apache.spark.sql.parquet.DefaultSource. You can also
 look at your files in the file system. They are stored by Parquet.

 Thanks,

 Yin

 On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
 CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
 schema _and_ storage format in the Hive metastore, so that the table
 cannot be read from inside Hive. Spark itself can read the table, but
 Hive throws a Serialization error because it doesn't know it is
 Parquet.

 val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education,
 income)
 df.saveAsTable(spark_test_foo)

 Expected:

 COLUMNS(
   education BIGINT,
   income BIGINT
 )

 SerDe Library:
 org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
 InputFormat:
 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

 Actual:

 COLUMNS(
   col arraystring COMMENT from deserializer
 )

 SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
 InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

 ---

 Manually changing schema and storage restores access in Hive and
 doesn't affect Spark. Note also that Hive's table property
 spark.sql.sources.schema is correct. At first glance, it looks like
 the schema data is serialized when sent to Hive but not deserialized
 properly on receive.

 I'm tracing execution through source code... but before I get any
 deeper, can anyone reproduce this behavior?

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org