[jira] [Commented] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Gang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525144#comment-15525144
 ] 

Gang Wu commented on SPARK-17672:
-

They are similar but different.
This JIRA deals with the approach to get one specific appId from the whole list 
(returned from the map). SPARK-17671 deals with the number of app infos to get 
from the map.

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525137#comment-15525137
 ] 

Sean Owen commented on SPARK-17672:
---

I don't see how this is separate from SPARK-17671?

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525101#comment-15525101
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 5:17 AM:
--

{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

So the relations do look exactly the same (even though they are not) in the 
optimized plan


was (Author: rspitzer):
{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
> 

[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525101#comment-15525101
 ] 

Russell Spitzer commented on SPARK-17673:
-

{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To 

[jira] [Updated] (SPARK-17680) Unicode Character Support for Column Names and Comments

2016-09-26 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-17680:

Description: 
Spark SQL supports Unicode characters for column names when specified within 
backticks(`). When the Hive support is enabled, the version of the Hive 
metastore must be higher than 0.12, See the JIRA: 
https://issues.apache.org/jira/browse/HIVE-6013 Hive metastore supports Unicode 
characters for column names since 0.13.

In Spark SQL, table comments, and view comments always allow Unicode characters 
without backticks.


  was:
When the version of the Hive metastore is higher than 0.12, Spark SQL supports 
Unicode characters for column names when specified within backticks(`). See the 
JIRA: https://issues.apache.org/jira/browse/HIVE-6013 Hive metastore supports 
Unicode characters for column names since 0.13 

In Spark SQL, table comments, and view comments always allow Unicode characters 
without backticks.



> Unicode Character Support for Column Names and Comments
> ---
>
> Key: SPARK-17680
> URL: https://issues.apache.org/jira/browse/SPARK-17680
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>
> Spark SQL supports Unicode characters for column names when specified within 
> backticks(`). When the Hive support is enabled, the version of the Hive 
> metastore must be higher than 0.12, See the JIRA: 
> https://issues.apache.org/jira/browse/HIVE-6013 Hive metastore supports 
> Unicode characters for column names since 0.13.
> In Spark SQL, table comments, and view comments always allow Unicode 
> characters without backticks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525096#comment-15525096
 ] 

Russell Spitzer commented on SPARK-17673:
-

Ah yeah there would definitely be different pruning in both "source"s  Getting 
the optimized plan now

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17680) Unicode Character Support for Column Names and Comments

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17680:


Assignee: Apache Spark

> Unicode Character Support for Column Names and Comments
> ---
>
> Key: SPARK-17680
> URL: https://issues.apache.org/jira/browse/SPARK-17680
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>Assignee: Apache Spark
>
> When the version of the Hive metastore is higher than 0.12, Spark SQL 
> supports Unicode characters for column names when specified within 
> backticks(`). See the JIRA: https://issues.apache.org/jira/browse/HIVE-6013 
> Hive metastore supports Unicode characters for column names since 0.13 
> In Spark SQL, table comments, and view comments always allow Unicode 
> characters without backticks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17680) Unicode Character Support for Column Names and Comments

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17680:


Assignee: (was: Apache Spark)

> Unicode Character Support for Column Names and Comments
> ---
>
> Key: SPARK-17680
> URL: https://issues.apache.org/jira/browse/SPARK-17680
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>
> When the version of the Hive metastore is higher than 0.12, Spark SQL 
> supports Unicode characters for column names when specified within 
> backticks(`). See the JIRA: https://issues.apache.org/jira/browse/HIVE-6013 
> Hive metastore supports Unicode characters for column names since 0.13 
> In Spark SQL, table comments, and view comments always allow Unicode 
> characters without backticks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17680) Unicode Character Support for Column Names and Comments

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525090#comment-15525090
 ] 

Apache Spark commented on SPARK-17680:
--

User 'gatorsmile' has created a pull request for this issue:
https://github.com/apache/spark/pull/15255

> Unicode Character Support for Column Names and Comments
> ---
>
> Key: SPARK-17680
> URL: https://issues.apache.org/jira/browse/SPARK-17680
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Xiao Li
>
> When the version of the Hive metastore is higher than 0.12, Spark SQL 
> supports Unicode characters for column names when specified within 
> backticks(`). See the JIRA: https://issues.apache.org/jira/browse/HIVE-6013 
> Hive metastore supports Unicode characters for column names since 0.13 
> In Spark SQL, table comments, and view comments always allow Unicode 
> characters without backticks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17680) Unicode Character Support for Column Names and Comments

2016-09-26 Thread Xiao Li (JIRA)
Xiao Li created SPARK-17680:
---

 Summary: Unicode Character Support for Column Names and Comments
 Key: SPARK-17680
 URL: https://issues.apache.org/jira/browse/SPARK-17680
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 2.1.0
Reporter: Xiao Li


When the version of the Hive metastore is higher than 0.12, Spark SQL supports 
Unicode characters for column names when specified within backticks(`). See the 
JIRA: https://issues.apache.org/jira/browse/HIVE-6013 Hive metastore supports 
Unicode characters for column names since 0.13 

In Spark SQL, table comments, and view comments always allow Unicode characters 
without backticks.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17679) Remove unnecessary Py4J ListConverter patch

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17679:


Assignee: (was: Apache Spark)

> Remove unnecessary Py4J ListConverter patch
> ---
>
> Key: SPARK-17679
> URL: https://issues.apache.org/jira/browse/SPARK-17679
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Jason White
>Priority: Minor
>
> In SPARK-6949 davies documented a couple of bugs with Py4J that prevented 
> Spark from registering a converter for date and datetime objects. Patched in 
> https://github.com/apache/spark/pull/5570.
> Specifically https://github.com/bartdag/py4j/issues/160 dealt with 
> ListConverter automatically converting bytearrays into ArrayList instead of 
> leaving it alone.
> Py4J #160 has since been fixed in Py4J, since the 0.9 release a couple of 
> months after Spark #5570. According to spark-core's pom.xml, we're using 
> 0.10.3.
> We should remove this patch on ListConverter since the upstream package no 
> longer has this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17679) Remove unnecessary Py4J ListConverter patch

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525059#comment-15525059
 ] 

Apache Spark commented on SPARK-17679:
--

User 'JasonMWhite' has created a pull request for this issue:
https://github.com/apache/spark/pull/15254

> Remove unnecessary Py4J ListConverter patch
> ---
>
> Key: SPARK-17679
> URL: https://issues.apache.org/jira/browse/SPARK-17679
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Jason White
>Priority: Minor
>
> In SPARK-6949 davies documented a couple of bugs with Py4J that prevented 
> Spark from registering a converter for date and datetime objects. Patched in 
> https://github.com/apache/spark/pull/5570.
> Specifically https://github.com/bartdag/py4j/issues/160 dealt with 
> ListConverter automatically converting bytearrays into ArrayList instead of 
> leaving it alone.
> Py4J #160 has since been fixed in Py4J, since the 0.9 release a couple of 
> months after Spark #5570. According to spark-core's pom.xml, we're using 
> 0.10.3.
> We should remove this patch on ListConverter since the upstream package no 
> longer has this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17679) Remove unnecessary Py4J ListConverter patch

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17679:


Assignee: Apache Spark

> Remove unnecessary Py4J ListConverter patch
> ---
>
> Key: SPARK-17679
> URL: https://issues.apache.org/jira/browse/SPARK-17679
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Jason White
>Assignee: Apache Spark
>Priority: Minor
>
> In SPARK-6949 davies documented a couple of bugs with Py4J that prevented 
> Spark from registering a converter for date and datetime objects. Patched in 
> https://github.com/apache/spark/pull/5570.
> Specifically https://github.com/bartdag/py4j/issues/160 dealt with 
> ListConverter automatically converting bytearrays into ArrayList instead of 
> leaving it alone.
> Py4J #160 has since been fixed in Py4J, since the 0.9 release a couple of 
> months after Spark #5570. According to spark-core's pom.xml, we're using 
> 0.10.3.
> We should remove this patch on ListConverter since the upstream package no 
> longer has this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17679) Remove unnecessary Py4J ListConverter patch

2016-09-26 Thread Jason White (JIRA)
Jason White created SPARK-17679:
---

 Summary: Remove unnecessary Py4J ListConverter patch
 Key: SPARK-17679
 URL: https://issues.apache.org/jira/browse/SPARK-17679
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.0.0
Reporter: Jason White
Priority: Minor


In SPARK-6949 davies documented a couple of bugs with Py4J that prevented Spark 
from registering a converter for date and datetime objects. Patched in 
https://github.com/apache/spark/pull/5570.

Specifically https://github.com/bartdag/py4j/issues/160 dealt with 
ListConverter automatically converting bytearrays into ArrayList instead of 
leaving it alone.

Py4J #160 has since been fixed in Py4J, since the 0.9 release a couple of 
months after Spark #5570. According to spark-core's pom.xml, we're using 0.10.3.

We should remove this patch on ListConverter since the upstream package no 
longer has this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525036#comment-15525036
 ] 

Herman van Hovell commented on SPARK-17673:
---

Could you also share the optimized plan {{df.explain(true)}? I am wondering if 
the attributes are the same.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525036#comment-15525036
 ] 

Herman van Hovell edited comment on SPARK-17673 at 9/27/16 4:31 AM:


Could you also share the optimized plan {{df.explain(true)}}? I am wondering if 
the attributes are the same.


was (Author: hvanhovell):
Could you also share the optimized plan {{df.explain(true)}? I am wondering if 
the attributes are the same.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525011#comment-15525011
 ] 

Reynold Xin commented on SPARK-17673:
-

The only thing differentiating the two sides of the plan is column pruning 
right? It is possible the issue I mentioned earlier is the culprit.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525000#comment-15525000
 ] 

Reynold Xin edited comment on SPARK-17673 at 9/27/16 4:14 AM:
--

RowDataSourceScanExec.sameResult is probably the problem:

{code}
  // Ignore rdd when checking results
  override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: RowDataSourceScanExec => relation == other.relation && metadata 
== other.metadata
case _ => false
  }
{code}

First glance -- it looks like it ignores predicate pushdown or column pruning 
entirely.



was (Author: rxin):
RowDataSourceScanExec.sameResult is probably the problem:

{code}
  // Ignore rdd when checking results
  override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: RowDataSourceScanExec => relation == other.relation && metadata 
== other.metadata
case _ => false
  }
{code}

First glance -- it looks like it ignores predicate pushdown entirely.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525007#comment-15525007
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 4:12 AM:
--

Looking at this plan 
{code}```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```
{code}
I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?


was (Author: rspitzer):
Looking at this plan 
```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```

I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525007#comment-15525007
 ] 

Russell Spitzer commented on SPARK-17673:
-

Looking at this plan 
```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```

I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525000#comment-15525000
 ] 

Reynold Xin commented on SPARK-17673:
-

RowDataSourceScanExec.sameResult is probably the problem:

{code}
  // Ignore rdd when checking results
  override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: RowDataSourceScanExec => relation == other.relation && metadata 
== other.metadata
case _ => false
  }
{code}

First glance -- it looks like it ignores predicate pushdown entirely.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524999#comment-15524999
 ] 

Russell Spitzer commented on SPARK-17673:
-

We shouldn't be ... The only thing we cache are underlying database connections 
and queries which shouldn't factor into this I would think :/

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524993#comment-15524993
 ] 

Herman van Hovell commented on SPARK-17673:
---

[~rspitzer] we only reuse an exchange when they produce the same result. You 
can find the implementation for row datasource scans (what you are probably 
using) here 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L124-L128

I have taken a look at the cassandra datasource code, but that does not seem to 
implement equals at all and should not cause a problem. Do you cache instances 
of CassandraRelation at some point?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17678) Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17678:


Assignee: Apache Spark

> Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"
> 
>
> Key: SPARK-17678
> URL: https://issues.apache.org/jira/browse/SPARK-17678
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.6.3
>Reporter: Saisai Shao
>Assignee: Apache Spark
>
> Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port" 
> configuration, so user cannot set a fixed port number through 
> "spark.replClassServer.port".
> There's no issue in Spark2.0+, since this class is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17678) Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17678:


Assignee: (was: Apache Spark)

> Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"
> 
>
> Key: SPARK-17678
> URL: https://issues.apache.org/jira/browse/SPARK-17678
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.6.3
>Reporter: Saisai Shao
>
> Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port" 
> configuration, so user cannot set a fixed port number through 
> "spark.replClassServer.port".
> There's no issue in Spark2.0+, since this class is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17678) Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524976#comment-15524976
 ] 

Apache Spark commented on SPARK-17678:
--

User 'jerryshao' has created a pull request for this issue:
https://github.com/apache/spark/pull/15253

> Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"
> 
>
> Key: SPARK-17678
> URL: https://issues.apache.org/jira/browse/SPARK-17678
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 1.6.3
>Reporter: Saisai Shao
>
> Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port" 
> configuration, so user cannot set a fixed port number through 
> "spark.replClassServer.port".
> There's no issue in Spark2.0+, since this class is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17678) Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port"

2016-09-26 Thread Saisai Shao (JIRA)
Saisai Shao created SPARK-17678:
---

 Summary: Spark 1.6 Scala-2.11 repl doesn't honor 
"spark.replClassServer.port"
 Key: SPARK-17678
 URL: https://issues.apache.org/jira/browse/SPARK-17678
 Project: Spark
  Issue Type: Bug
  Components: Spark Shell
Affects Versions: 1.6.3
Reporter: Saisai Shao


Spark 1.6 Scala-2.11 repl doesn't honor "spark.replClassServer.port" 
configuration, so user cannot set a fixed port number through 
"spark.replClassServer.port".

There's no issue in Spark2.0+, since this class is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17677) Break WindowExec.scala into multiple files

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524955#comment-15524955
 ] 

Apache Spark commented on SPARK-17677:
--

User 'rxin' has created a pull request for this issue:
https://github.com/apache/spark/pull/15252

> Break WindowExec.scala into multiple files
> --
>
> Key: SPARK-17677
> URL: https://issues.apache.org/jira/browse/SPARK-17677
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> As of Spark 2.0, all the window function execution code are in 
> WindowExec.scala. This file is pretty large (over 1k loc) and has a lot of 
> different abstractions in them. It would be more maintainable to break it 
> into multiple pieces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17677) Break WindowExec.scala into multiple files

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17677:


Assignee: Apache Spark  (was: Reynold Xin)

> Break WindowExec.scala into multiple files
> --
>
> Key: SPARK-17677
> URL: https://issues.apache.org/jira/browse/SPARK-17677
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Apache Spark
>
> As of Spark 2.0, all the window function execution code are in 
> WindowExec.scala. This file is pretty large (over 1k loc) and has a lot of 
> different abstractions in them. It would be more maintainable to break it 
> into multiple pieces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17677) Break WindowExec.scala into multiple files

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17677:


Assignee: Reynold Xin  (was: Apache Spark)

> Break WindowExec.scala into multiple files
> --
>
> Key: SPARK-17677
> URL: https://issues.apache.org/jira/browse/SPARK-17677
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> As of Spark 2.0, all the window function execution code are in 
> WindowExec.scala. This file is pretty large (over 1k loc) and has a lot of 
> different abstractions in them. It would be more maintainable to break it 
> into multiple pieces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17677) Break WindowExec.scala into multiple files

2016-09-26 Thread Reynold Xin (JIRA)
Reynold Xin created SPARK-17677:
---

 Summary: Break WindowExec.scala into multiple files
 Key: SPARK-17677
 URL: https://issues.apache.org/jira/browse/SPARK-17677
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: Reynold Xin
Assignee: Reynold Xin


As of Spark 2.0, all the window function execution code are in 
WindowExec.scala. This file is pretty large (over 1k loc) and has a lot of 
different abstractions in them. It would be more maintainable to break it into 
multiple pieces.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17676) FsHistoryProvider should ignore hidden files

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524893#comment-15524893
 ] 

Apache Spark commented on SPARK-17676:
--

User 'squito' has created a pull request for this issue:
https://github.com/apache/spark/pull/15250

> FsHistoryProvider should ignore hidden files
> 
>
> Key: SPARK-17676
> URL: https://issues.apache.org/jira/browse/SPARK-17676
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>Assignee: Imran Rashid
>Priority: Minor
>
> FsHistoryProvider currently reads hidden files (beginning with ".") from the 
> log dir.  However, it is writing a hidden file *itself* to that dir, which 
> cannot be parsed, as part of a trick to find the scan time according to the 
> file system:
> {code}
> val fileName = "." + UUID.randomUUID().toString
> val path = new Path(logDir, fileName)
> val fos = fs.create(path)
> {code}
> It does delete the tmp file immediately, but we've seen cases where that race 
> ends badly, and there is a logged error.  The error is harmless (the log file 
> is ignored and spark moves on to the other log files), but the logged error 
> is very confusing for users, so we should avoid it.
> {noformat}
> 2016-09-26 09:10:03,016 ERROR 
> org.apache.spark.deploy.history.FsHistoryProvider: Exception encountered when 
> attempting to load application log 
> hdfs://XXX/user/spark/applicationHistory/.3a5e987c-ace5-4568-9ccd-6285010e399a
>  
> java.lang.IllegalArgumentException: Codec 
> [3a5e987c-ace5-4568-9ccd-6285010e399a] is not available. Consider setting 
> spark.io.compression.codec=lzf 
> at 
> org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
>  
> at 
> org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
>  
> at scala.Option.getOrElse(Option.scala:120) 
> at 
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:72) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
>  
> at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) 
> at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:309)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:308)
>  
> at scala.Option.map(Option.scala:145) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$.openEventLog(EventLoggingListener.scala:308)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:518)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:359)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:356)
>  
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
> at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:356)
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$1$$anon$4.run(FsHistoryProvider.scala:277)
>  
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  
> at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17676) FsHistoryProvider should ignore hidden files

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17676:


Assignee: Imran Rashid  (was: Apache Spark)

> FsHistoryProvider should ignore hidden files
> 
>
> Key: SPARK-17676
> URL: https://issues.apache.org/jira/browse/SPARK-17676
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>Assignee: Imran Rashid
>Priority: Minor
>
> FsHistoryProvider currently reads hidden files (beginning with ".") from the 
> log dir.  However, it is writing a hidden file *itself* to that dir, which 
> cannot be parsed, as part of a trick to find the scan time according to the 
> file system:
> {code}
> val fileName = "." + UUID.randomUUID().toString
> val path = new Path(logDir, fileName)
> val fos = fs.create(path)
> {code}
> It does delete the tmp file immediately, but we've seen cases where that race 
> ends badly, and there is a logged error.  The error is harmless (the log file 
> is ignored and spark moves on to the other log files), but the logged error 
> is very confusing for users, so we should avoid it.
> {noformat}
> 2016-09-26 09:10:03,016 ERROR 
> org.apache.spark.deploy.history.FsHistoryProvider: Exception encountered when 
> attempting to load application log 
> hdfs://XXX/user/spark/applicationHistory/.3a5e987c-ace5-4568-9ccd-6285010e399a
>  
> java.lang.IllegalArgumentException: Codec 
> [3a5e987c-ace5-4568-9ccd-6285010e399a] is not available. Consider setting 
> spark.io.compression.codec=lzf 
> at 
> org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
>  
> at 
> org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
>  
> at scala.Option.getOrElse(Option.scala:120) 
> at 
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:72) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
>  
> at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) 
> at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:309)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:308)
>  
> at scala.Option.map(Option.scala:145) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$.openEventLog(EventLoggingListener.scala:308)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:518)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:359)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:356)
>  
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
> at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:356)
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$1$$anon$4.run(FsHistoryProvider.scala:277)
>  
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  
> at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17676) FsHistoryProvider should ignore hidden files

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17676:


Assignee: Apache Spark  (was: Imran Rashid)

> FsHistoryProvider should ignore hidden files
> 
>
> Key: SPARK-17676
> URL: https://issues.apache.org/jira/browse/SPARK-17676
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Imran Rashid
>Assignee: Apache Spark
>Priority: Minor
>
> FsHistoryProvider currently reads hidden files (beginning with ".") from the 
> log dir.  However, it is writing a hidden file *itself* to that dir, which 
> cannot be parsed, as part of a trick to find the scan time according to the 
> file system:
> {code}
> val fileName = "." + UUID.randomUUID().toString
> val path = new Path(logDir, fileName)
> val fos = fs.create(path)
> {code}
> It does delete the tmp file immediately, but we've seen cases where that race 
> ends badly, and there is a logged error.  The error is harmless (the log file 
> is ignored and spark moves on to the other log files), but the logged error 
> is very confusing for users, so we should avoid it.
> {noformat}
> 2016-09-26 09:10:03,016 ERROR 
> org.apache.spark.deploy.history.FsHistoryProvider: Exception encountered when 
> attempting to load application log 
> hdfs://XXX/user/spark/applicationHistory/.3a5e987c-ace5-4568-9ccd-6285010e399a
>  
> java.lang.IllegalArgumentException: Codec 
> [3a5e987c-ace5-4568-9ccd-6285010e399a] is not available. Consider setting 
> spark.io.compression.codec=lzf 
> at 
> org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
>  
> at 
> org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
>  
> at scala.Option.getOrElse(Option.scala:120) 
> at 
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:72) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
>  
> at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) 
> at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:309)
>  
> at 
> org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:308)
>  
> at scala.Option.map(Option.scala:145) 
> at 
> org.apache.spark.scheduler.EventLoggingListener$.openEventLog(EventLoggingListener.scala:308)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:518)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:359)
>  
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:356)
>  
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
> at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:356)
> at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$1$$anon$4.run(FsHistoryProvider.scala:277)
>  
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  
> at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17676) FsHistoryProvider should ignore hidden files

2016-09-26 Thread Imran Rashid (JIRA)
Imran Rashid created SPARK-17676:


 Summary: FsHistoryProvider should ignore hidden files
 Key: SPARK-17676
 URL: https://issues.apache.org/jira/browse/SPARK-17676
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid
Assignee: Imran Rashid
Priority: Minor


FsHistoryProvider currently reads hidden files (beginning with ".") from the 
log dir.  However, it is writing a hidden file *itself* to that dir, which 
cannot be parsed, as part of a trick to find the scan time according to the 
file system:

{code}
val fileName = "." + UUID.randomUUID().toString
val path = new Path(logDir, fileName)
val fos = fs.create(path)
{code}

It does delete the tmp file immediately, but we've seen cases where that race 
ends badly, and there is a logged error.  The error is harmless (the log file 
is ignored and spark moves on to the other log files), but the logged error is 
very confusing for users, so we should avoid it.

{noformat}
2016-09-26 09:10:03,016 ERROR 
org.apache.spark.deploy.history.FsHistoryProvider: Exception encountered when 
attempting to load application log 
hdfs://XXX/user/spark/applicationHistory/.3a5e987c-ace5-4568-9ccd-6285010e399a 
java.lang.IllegalArgumentException: Codec 
[3a5e987c-ace5-4568-9ccd-6285010e399a] is not available. Consider setting 
spark.io.compression.codec=lzf 
at 
org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
 
at 
org.apache.spark.io.CompressionCodec$$anonfun$createCodec$1.apply(CompressionCodec.scala:72)
 
at scala.Option.getOrElse(Option.scala:120) 
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:72) 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$8$$anonfun$apply$1.apply(EventLoggingListener.scala:309)
 
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) 
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:309)
 
at 
org.apache.spark.scheduler.EventLoggingListener$$anonfun$8.apply(EventLoggingListener.scala:308)
 
at scala.Option.map(Option.scala:145) 
at 
org.apache.spark.scheduler.EventLoggingListener$.openEventLog(EventLoggingListener.scala:308)
 
at 
org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:518)
 
at 
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:359)
 
at 
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistoryProvider.scala:356)
 
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
at 
org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:356)
at 
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$1$$anon$4.run(FsHistoryProvider.scala:277)
 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524848#comment-15524848
 ] 

Russell Spitzer commented on SPARK-17673:
-

I couldn't get this to happen without C*, hopefully tomorrow I can get some 
guidance tomorrow :/ It could be a hashcode / equals thing but we don't 
override those in the base class. Also i'm a little confused because this 
should be the same "grouping" operation on the RDD just with a different 
aggregate. I don't know enough about the ReusedExchange to know when it's 
applied and why.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8425) Add blacklist mechanism for task scheduling

2016-09-26 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524811#comment-15524811
 ] 

Imran Rashid edited comment on SPARK-8425 at 9/27/16 2:20 AM:
--

Breaking off a smaller chunk of this that can be added independently in 
SPARK-17675


was (Author: irashid):
Breaking off a smaller chunk of this that can be added indepently in SPARK-17675

> Add blacklist mechanism for task scheduling
> ---
>
> Key: SPARK-8425
> URL: https://issues.apache.org/jira/browse/SPARK-8425
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, YARN
>Reporter: Saisai Shao
>Assignee: Imran Rashid
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8425) Add blacklist mechanism for task scheduling

2016-09-26 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524811#comment-15524811
 ] 

Imran Rashid commented on SPARK-8425:
-

Breaking off a smaller chunk of this that can be added indepently in SPARK-17675

> Add blacklist mechanism for task scheduling
> ---
>
> Key: SPARK-8425
> URL: https://issues.apache.org/jira/browse/SPARK-8425
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, YARN
>Reporter: Saisai Shao
>Assignee: Imran Rashid
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17675) Add Blacklisting of Executors & Nodes within one TaskSet

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524790#comment-15524790
 ] 

Apache Spark commented on SPARK-17675:
--

User 'squito' has created a pull request for this issue:
https://github.com/apache/spark/pull/15249

> Add Blacklisting of Executors & Nodes within one TaskSet
> 
>
> Key: SPARK-17675
> URL: https://issues.apache.org/jira/browse/SPARK-17675
> Project: Spark
>  Issue Type: Task
>  Components: Scheduler
>Affects Versions: 2.0.0
>Reporter: Imran Rashid
>Assignee: Imran Rashid
>
> This is a step along the way to SPARK-8425 -- see the design doc on that jira 
> for a complete discussion of blacklisting.
> To enable incremental review, the first step proposed here is to expand the 
> blacklisting within tasksets.  In particular, this will enable blacklisting 
> for
> * (task, executor) pairs (this already exists via an undocumented config)
> * (task, node)
> * (taskset, executor)
> * (taskset, node)
> In particular, adding (task, node) is critical to making spark fault-tolerant 
> of one-bad disk in a cluster, without requiring careful tuning of 
> "spark.task.maxFailures".  The other additions are also important to avoid 
> many misleading task failures and long scheduling delays when there is one 
> bad node on a large cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17675) Add Blacklisting of Executors & Nodes within one TaskSet

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17675:


Assignee: Imran Rashid  (was: Apache Spark)

> Add Blacklisting of Executors & Nodes within one TaskSet
> 
>
> Key: SPARK-17675
> URL: https://issues.apache.org/jira/browse/SPARK-17675
> Project: Spark
>  Issue Type: Task
>  Components: Scheduler
>Affects Versions: 2.0.0
>Reporter: Imran Rashid
>Assignee: Imran Rashid
>
> This is a step along the way to SPARK-8425 -- see the design doc on that jira 
> for a complete discussion of blacklisting.
> To enable incremental review, the first step proposed here is to expand the 
> blacklisting within tasksets.  In particular, this will enable blacklisting 
> for
> * (task, executor) pairs (this already exists via an undocumented config)
> * (task, node)
> * (taskset, executor)
> * (taskset, node)
> In particular, adding (task, node) is critical to making spark fault-tolerant 
> of one-bad disk in a cluster, without requiring careful tuning of 
> "spark.task.maxFailures".  The other additions are also important to avoid 
> many misleading task failures and long scheduling delays when there is one 
> bad node on a large cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17675) Add Blacklisting of Executors & Nodes within one TaskSet

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17675:


Assignee: Apache Spark  (was: Imran Rashid)

> Add Blacklisting of Executors & Nodes within one TaskSet
> 
>
> Key: SPARK-17675
> URL: https://issues.apache.org/jira/browse/SPARK-17675
> Project: Spark
>  Issue Type: Task
>  Components: Scheduler
>Affects Versions: 2.0.0
>Reporter: Imran Rashid
>Assignee: Apache Spark
>
> This is a step along the way to SPARK-8425 -- see the design doc on that jira 
> for a complete discussion of blacklisting.
> To enable incremental review, the first step proposed here is to expand the 
> blacklisting within tasksets.  In particular, this will enable blacklisting 
> for
> * (task, executor) pairs (this already exists via an undocumented config)
> * (task, node)
> * (taskset, executor)
> * (taskset, node)
> In particular, adding (task, node) is critical to making spark fault-tolerant 
> of one-bad disk in a cluster, without requiring careful tuning of 
> "spark.task.maxFailures".  The other additions are also important to avoid 
> many misleading task failures and long scheduling delays when there is one 
> bad node on a large cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17675) Add Blacklisting of Executors & Nodes within one TaskSet

2016-09-26 Thread Imran Rashid (JIRA)
Imran Rashid created SPARK-17675:


 Summary: Add Blacklisting of Executors & Nodes within one TaskSet
 Key: SPARK-17675
 URL: https://issues.apache.org/jira/browse/SPARK-17675
 Project: Spark
  Issue Type: Task
  Components: Scheduler
Affects Versions: 2.0.0
Reporter: Imran Rashid
Assignee: Imran Rashid


This is a step along the way to SPARK-8425 -- see the design doc on that jira 
for a complete discussion of blacklisting.

To enable incremental review, the first step proposed here is to expand the 
blacklisting within tasksets.  In particular, this will enable blacklisting for

* (task, executor) pairs (this already exists via an undocumented config)
* (task, node)
* (taskset, executor)
* (taskset, node)

In particular, adding (task, node) is critical to making spark fault-tolerant 
of one-bad disk in a cluster, without requiring careful tuning of 
"spark.task.maxFailures".  The other additions are also important to avoid many 
misleading task failures and long scheduling delays when there is one bad node 
on a large cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 1:39 AM:
--

Well in this case they are equal correct? We are using the same Dataframe with 
two different aggregation steps.

The Parquet example doesn't end up using the reusedExchange. It does the same 
plan as the parallelized one

{code}
PDF
== Physical Plan ==
Union
:- *HashAggregate(keys=[id#112], functions=[min(col1#113)])
:  +- Exchange hashpartitioning(id#112, 200)
: +- *HashAggregate(keys=[id#112], functions=[partial_min(col1#113)])
:+- *BatchedScan parquet [id#112,col1#113] Format: ParquetFormat, 
InputPaths: 
file:/Users/russellspitzer/repos/spark-cassandra-connector/ꟾ뫳㼎麡䰨틖㇗ཨᎪ贬, 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- *HashAggregate(keys=[id#112], functions=[min(col2#114)])
   +- Exchange hashpartitioning(id#112, 200)
  +- *HashAggregate(keys=[id#112], functions=[partial_min(col2#114)])
 +- *BatchedScan parquet [id#112,col2#114] Format: ParquetFormat, 
InputPaths: 
file:/Users/russellspitzer/repos/spark-cassandra-connector/ꟾ뫳㼎麡䰨틖㇗ཨᎪ贬, 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}


was (Author: rspitzer):
Well in this case they are equal correct?

The Parquet example doesn't end up using the reusedExchange

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 1:38 AM:
--

Well in this case they are equal correct?

The Parquet example doesn't end up using the reusedExchange


was (Author: rspitzer):
Well in this case they are equal correct?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer commented on SPARK-17673:
-

Well in this case they are equal correct?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524725#comment-15524725
 ] 

Reynold Xin commented on SPARK-17673:
-

It's possible if hashCode and equals are not defined properly in the Cassandra 
data source.


> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524722#comment-15524722
 ] 

Russell Spitzer commented on SPARK-17673:
-

Ugh I made a typo in my Parquet Example I don't see it repoing there now. Let 
me run investigate a little more as to why this would affect the C* Source...

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-17673:

Priority: Critical  (was: Major)

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524687#comment-15524687
 ] 

Reynold Xin commented on SPARK-17673:
-

Can you help create a repro (without the need to connect Cassandra)?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524641#comment-15524641
 ] 

Russell Spitzer commented on SPARK-17673:
-

I only ran this on 2.0.0 and 2.0.1

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17674) Warnings from SparkR tests being ignored without redirecting to errors

2016-09-26 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-17674:
-
Description: 
For example, _currently_ we are having warnings as below:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65905/consoleFull

{code}
Warnings ---
1. spark.mlp (@test_mllib.R#400) - is.na() applied to non-(list or vector) of 
type 'NULL'

2. spark.mlp (@test_mllib.R#401) - is.na() applied to non-(list or vector) of 
type 'NULL'
{code}

This should be errors as specified in 
https://github.com/apache/spark/blob/master/R/pkg/tests/run-all.R#L22 

However, it seems passing the tests fine.

This seems related with the behaciour in `testhat` library. We should 
invesigate and fix. This was also discussed in 
https://github.com/apache/spark/pull/15232

  was:
For example, _currently_ we are having warnings as below:

{code}

```
Warnings ---
1. spark.mlp (@test_mllib.R#400) - is.na() applied to non-(list or vector) of 
type 'NULL'

2. spark.mlp (@test_mllib.R#401) - is.na() applied to non-(list or vector) of 
type 'NULL'
```

{code}

This should be errors as specified in 
https://github.com/apache/spark/blob/master/R/pkg/tests/run-all.R#L22 

However, it seems passing the tests fine.

This seems related with the behaciour in `testhat` library. We should 
invesigate and fix. This was also discussed in 
https://github.com/apache/spark/pull/15232


> Warnings from SparkR tests being ignored without redirecting to errors
> --
>
> Key: SPARK-17674
> URL: https://issues.apache.org/jira/browse/SPARK-17674
> Project: Spark
>  Issue Type: Test
>  Components: SparkR
>Reporter: Hyukjin Kwon
>
> For example, _currently_ we are having warnings as below:
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65905/consoleFull
> {code}
> Warnings 
> ---
> 1. spark.mlp (@test_mllib.R#400) - is.na() applied to non-(list or vector) of 
> type 'NULL'
> 2. spark.mlp (@test_mllib.R#401) - is.na() applied to non-(list or vector) of 
> type 'NULL'
> {code}
> This should be errors as specified in 
> https://github.com/apache/spark/blob/master/R/pkg/tests/run-all.R#L22 
> However, it seems passing the tests fine.
> This seems related with the behaciour in `testhat` library. We should 
> invesigate and fix. This was also discussed in 
> https://github.com/apache/spark/pull/15232



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17674) Warnings from SparkR tests being ignored without redirecting to errors

2016-09-26 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-17674:


 Summary: Warnings from SparkR tests being ignored without 
redirecting to errors
 Key: SPARK-17674
 URL: https://issues.apache.org/jira/browse/SPARK-17674
 Project: Spark
  Issue Type: Test
  Components: SparkR
Reporter: Hyukjin Kwon


For example, _currently_ we are having warnings as below:

{code}

```
Warnings ---
1. spark.mlp (@test_mllib.R#400) - is.na() applied to non-(list or vector) of 
type 'NULL'

2. spark.mlp (@test_mllib.R#401) - is.na() applied to non-(list or vector) of 
type 'NULL'
```

{code}

This should be errors as specified in 
https://github.com/apache/spark/blob/master/R/pkg/tests/run-all.R#L22 

However, it seems passing the tests fine.

This seems related with the behaciour in `testhat` library. We should 
invesigate and fix. This was also discussed in 
https://github.com/apache/spark/pull/15232



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Gang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524593#comment-15524593
 ] 

Gang Wu commented on SPARK-17672:
-

Hi [~ajbozarth], can you take a look at the PR? Thanks!

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17671) Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

2016-09-26 Thread Gang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524595#comment-15524595
 ] 

Gang Wu commented on SPARK-17671:
-

Hi [~ajbozarth], can you take a look at the PR? Thanks!

> Spark 2.0 history server summary page is slow even set 
> spark.history.ui.maxApplications
> ---
>
> Key: SPARK-17671
> URL: https://issues.apache.org/jira/browse/SPARK-17671
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> This is a subsequent task of 
> [SPARK-17243|https://issues.apache.org/jira/browse/SPARK-17243]. After the 
> fix of SPARK-17243 (limit the number of applications in the JSON string 
> transferred from history server backend to web UI frontend), the history 
> server does display the target number of history summaries. 
> However, when there are more than 10k application history, it still gets 
> slower and slower. The problem is in the following code:
> {code:title=ApplicationListResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class ApplicationListResource(uiRoot: UIRoot) {
>   @GET
>   def appList(
>   @QueryParam("status") status: JList[ApplicationStatus],
>   @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
> SimpleDateParam,
>   @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
> SimpleDateParam,
>   @QueryParam("limit") limit: Integer)
>   : Iterator[ApplicationInfo] = {
> // although there is a limit operation in the end
> // the following line still does a transformation for all history 
> // in the list
> val allApps = uiRoot.getApplicationInfoList
> 
> // ...
> // irrelevant code is omitted 
> // ...
> if (limit != null) {
>   appList.take(limit)
> } else {
>   appList
> }
>   }
> }
> {code}
> What the code **uiRoot.getApplicationInfoList** does is to transform every 
> application history from class ApplicationHistoryInfo to class 
> ApplicationInfo. So if there are 10k applications, 10k transformations will 
> be done even we have limited 5000 jobs here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17671) Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17671:


Assignee: (was: Apache Spark)

> Spark 2.0 history server summary page is slow even set 
> spark.history.ui.maxApplications
> ---
>
> Key: SPARK-17671
> URL: https://issues.apache.org/jira/browse/SPARK-17671
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> This is a subsequent task of 
> [SPARK-17243|https://issues.apache.org/jira/browse/SPARK-17243]. After the 
> fix of SPARK-17243 (limit the number of applications in the JSON string 
> transferred from history server backend to web UI frontend), the history 
> server does display the target number of history summaries. 
> However, when there are more than 10k application history, it still gets 
> slower and slower. The problem is in the following code:
> {code:title=ApplicationListResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class ApplicationListResource(uiRoot: UIRoot) {
>   @GET
>   def appList(
>   @QueryParam("status") status: JList[ApplicationStatus],
>   @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
> SimpleDateParam,
>   @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
> SimpleDateParam,
>   @QueryParam("limit") limit: Integer)
>   : Iterator[ApplicationInfo] = {
> // although there is a limit operation in the end
> // the following line still does a transformation for all history 
> // in the list
> val allApps = uiRoot.getApplicationInfoList
> 
> // ...
> // irrelevant code is omitted 
> // ...
> if (limit != null) {
>   appList.take(limit)
> } else {
>   appList
> }
>   }
> }
> {code}
> What the code **uiRoot.getApplicationInfoList** does is to transform every 
> application history from class ApplicationHistoryInfo to class 
> ApplicationInfo. So if there are 10k applications, 10k transformations will 
> be done even we have limited 5000 jobs here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17671) Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524587#comment-15524587
 ] 

Apache Spark commented on SPARK-17671:
--

User 'wgtmac' has created a pull request for this issue:
https://github.com/apache/spark/pull/15248

> Spark 2.0 history server summary page is slow even set 
> spark.history.ui.maxApplications
> ---
>
> Key: SPARK-17671
> URL: https://issues.apache.org/jira/browse/SPARK-17671
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> This is a subsequent task of 
> [SPARK-17243|https://issues.apache.org/jira/browse/SPARK-17243]. After the 
> fix of SPARK-17243 (limit the number of applications in the JSON string 
> transferred from history server backend to web UI frontend), the history 
> server does display the target number of history summaries. 
> However, when there are more than 10k application history, it still gets 
> slower and slower. The problem is in the following code:
> {code:title=ApplicationListResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class ApplicationListResource(uiRoot: UIRoot) {
>   @GET
>   def appList(
>   @QueryParam("status") status: JList[ApplicationStatus],
>   @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
> SimpleDateParam,
>   @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
> SimpleDateParam,
>   @QueryParam("limit") limit: Integer)
>   : Iterator[ApplicationInfo] = {
> // although there is a limit operation in the end
> // the following line still does a transformation for all history 
> // in the list
> val allApps = uiRoot.getApplicationInfoList
> 
> // ...
> // irrelevant code is omitted 
> // ...
> if (limit != null) {
>   appList.take(limit)
> } else {
>   appList
> }
>   }
> }
> {code}
> What the code **uiRoot.getApplicationInfoList** does is to transform every 
> application history from class ApplicationHistoryInfo to class 
> ApplicationInfo. So if there are 10k applications, 10k transformations will 
> be done even we have limited 5000 jobs here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17671) Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17671:


Assignee: Apache Spark

> Spark 2.0 history server summary page is slow even set 
> spark.history.ui.maxApplications
> ---
>
> Key: SPARK-17671
> URL: https://issues.apache.org/jira/browse/SPARK-17671
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>Assignee: Apache Spark
>
> This is a subsequent task of 
> [SPARK-17243|https://issues.apache.org/jira/browse/SPARK-17243]. After the 
> fix of SPARK-17243 (limit the number of applications in the JSON string 
> transferred from history server backend to web UI frontend), the history 
> server does display the target number of history summaries. 
> However, when there are more than 10k application history, it still gets 
> slower and slower. The problem is in the following code:
> {code:title=ApplicationListResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class ApplicationListResource(uiRoot: UIRoot) {
>   @GET
>   def appList(
>   @QueryParam("status") status: JList[ApplicationStatus],
>   @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
> SimpleDateParam,
>   @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
> SimpleDateParam,
>   @QueryParam("limit") limit: Integer)
>   : Iterator[ApplicationInfo] = {
> // although there is a limit operation in the end
> // the following line still does a transformation for all history 
> // in the list
> val allApps = uiRoot.getApplicationInfoList
> 
> // ...
> // irrelevant code is omitted 
> // ...
> if (limit != null) {
>   appList.take(limit)
> } else {
>   appList
> }
>   }
> }
> {code}
> What the code **uiRoot.getApplicationInfoList** does is to transform every 
> application history from class ApplicationHistoryInfo to class 
> ApplicationInfo. So if there are 10k applications, 10k transformations will 
> be done even we have limited 5000 jobs here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524581#comment-15524581
 ] 

Herman van Hovell edited comment on SPARK-17673 at 9/27/16 12:15 AM:
-

[~rspitzer]] Are you using Spark 2.0 or the latest master?


was (Author: hvanhovell):
[~russell spitzer] Are you using Spark 2.0 or the latest master?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524581#comment-15524581
 ] 

Herman van Hovell commented on SPARK-17673:
---

[~russell spitzer] Are you using Spark 2.0 or the latest master?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524581#comment-15524581
 ] 

Herman van Hovell edited comment on SPARK-17673 at 9/27/16 12:16 AM:
-

[~rspitzer] Are you using Spark 2.0 or the latest master?


was (Author: hvanhovell):
[~rspitzer]] Are you using Spark 2.0 or the latest master?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524524#comment-15524524
 ] 

Apache Spark commented on SPARK-17672:
--

User 'wgtmac' has created a pull request for this issue:
https://github.com/apache/spark/pull/15247

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17672:


Assignee: (was: Apache Spark)

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17672:


Assignee: Apache Spark

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>Assignee: Apache Spark
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-17673:

Labels: correctness  (was: )

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-17673:
---

 Summary: Reused Exchange Aggregations Produce Incorrect Results
 Key: SPARK-17673
 URL: https://issues.apache.org/jira/browse/SPARK-17673
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 2.0.1
Reporter: Russell Spitzer


https://datastax-oss.atlassian.net/browse/SPARKC-429

Was brought to my attention where the following code produces incorrect results

{code}
 val data = List(TestData("A", 1, 7))
val frame = 
session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))

frame.createCassandraTable(
  keySpaceName,
  table,
  partitionKeyColumns = Some(Seq("id")))

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> table, "keyspace" -> keySpaceName))
  .save()

val loaded = sparkSession.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> table, "keyspace" -> ks))
  .load()
  .select("id", "col1", "col2")
val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
 min1.union(min2).show()
/* prints:
  +---+---+
  | id|min|
  +---+---+
  |  A|  1|
  |  A|  1|
  +---+---+
 Should be 
  | A| 1|
  | A| 7|
 */
{code}

I looked into the explain pattern and saw 
{code}
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
{code}

Which was different than using a parallelized collection as the DF backing. So 
I tested the same code with a Parquet backed DF and saw the same results.

{code}
frame.write.parquet("garbagetest")
val parquet = sparkSession.read.parquet("garbagetest").select("id", "col1", 
"col2")
println("PDF")
parquetmin1.union(parquetmin2).explain()
parquetmin1.union(parquetmin2).show()
/* prints:
  +---+---+
  | id|min|
  +---+---+
  |  A|  1|
  |  A|  1|
  +---+---+
*/
{code}

Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6624) Convert filters into CNF for data sources

2016-09-26 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-6624:
---
Assignee: (was: Yijie Shen)

> Convert filters into CNF for data sources
> -
>
> Key: SPARK-6624
> URL: https://issues.apache.org/jira/browse/SPARK-6624
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> We should turn filters into conjunctive normal form (CNF) before we pass them 
> to data sources. Otherwise, filters are not very useful if there is a single 
> filter with a bunch of ORs.
> Note that we already try to do some of these in BooleanSimplification, but I 
> think we should just formalize it to use CNF.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Gang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524424#comment-15524424
 ] 

Gang Wu commented on SPARK-17672:
-

I'm working on a fix and will send a PR soon.

> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Gang Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Wu updated SPARK-17672:

Description: 
When there are 10K application history in the history server back end, it can 
take a very long time to even get a single application history page. After some 
investigation, I found the root cause was the following piece of code: 

{code:title=OneApplicationResource.scala|borderStyle=solid}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneApplicationResource(uiRoot: UIRoot) {

  @GET
  def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
  }

}
{code}

Although all application history infos are stored in a LinkedHashMap, here to 
code transforms the map to an iterator and then uses the find() api which is O( 
n) instead of O(1) from a map.get() operation.

  was:
When there are 10K application history in the history server back end, it can 
take a very long time to even get a single application history page. After some 
investigation, I found the root cause was the following piece of code: 

{code:title=OneApplicationResource.scala|borderStyle=solid}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneApplicationResource(uiRoot: UIRoot) {

  @GET
  def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
  }

}
{code}

Although all application history infos are stored in a LinkedHashMap, here to 
code transforms the map to an iterator and then uses the find() api which is 
O(n) instead of O(1) from a map.get() operation.


> Spark 2.0 history server web Ui takes too long for a single application
> ---
>
> Key: SPARK-17672
> URL: https://issues.apache.org/jira/browse/SPARK-17672
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> When there are 10K application history in the history server back end, it can 
> take a very long time to even get a single application history page. After 
> some investigation, I found the root cause was the following piece of code: 
> {code:title=OneApplicationResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class OneApplicationResource(uiRoot: UIRoot) {
>   @GET
>   def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
> val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
> apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
>   }
> }
> {code}
> Although all application history infos are stored in a LinkedHashMap, here to 
> code transforms the map to an iterator and then uses the find() api which is 
> O( n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17672) Spark 2.0 history server web Ui takes too long for a single application

2016-09-26 Thread Gang Wu (JIRA)
Gang Wu created SPARK-17672:
---

 Summary: Spark 2.0 history server web Ui takes too long for a 
single application
 Key: SPARK-17672
 URL: https://issues.apache.org/jira/browse/SPARK-17672
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.0.0
Reporter: Gang Wu


When there are 10K application history in the history server back end, it can 
take a very long time to even get a single application history page. After some 
investigation, I found the root cause was the following piece of code: 

{code:title=OneApplicationResource.scala|borderStyle=solid}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class OneApplicationResource(uiRoot: UIRoot) {

  @GET
  def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
  }

}
{code}

Although all application history infos are stored in a LinkedHashMap, here to 
code transforms the map to an iterator and then uses the find() api which is 
O(n) instead of O(1) from a map.get() operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17653) Optimizer should remove unnecessary distincts (in multiple unions)

2016-09-26 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524175#comment-15524175
 ] 

Xiao Li commented on SPARK-17653:
-

Since Simon already submitted the PR, I will not continue the investigation. 
Thanks for answering my original question. 

> Optimizer should remove unnecessary distincts (in multiple unions)
> --
>
> Key: SPARK-17653
> URL: https://issues.apache.org/jira/browse/SPARK-17653
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Reynold Xin
>
> Query:
> {code}
> select 1 a union select 2 b union select 3 c
> {code}
> Explain plan:
> {code}
> == Physical Plan ==
> *HashAggregate(keys=[a#13], functions=[])
> +- Exchange hashpartitioning(a#13, 200)
>+- *HashAggregate(keys=[a#13], functions=[])
>   +- Union
>  :- *HashAggregate(keys=[a#13], functions=[])
>  :  +- Exchange hashpartitioning(a#13, 200)
>  : +- *HashAggregate(keys=[a#13], functions=[])
>  :+- Union
>  :   :- *Project [1 AS a#13]
>  :   :  +- Scan OneRowRelation[]
>  :   +- *Project [2 AS b#14]
>  :  +- Scan OneRowRelation[]
>  +- *Project [3 AS c#15]
> +- Scan OneRowRelation[]
> {code}
> Only one distinct should be necessary. This makes a bunch of unions slower 
> than a bunch of union alls followed by a distinct.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-17666:
---
Target Version/s: 2.0.2, 2.1.0
Priority: Critical  (was: Major)

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>Priority: Critical
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17671) Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

2016-09-26 Thread Gang Wu (JIRA)
Gang Wu created SPARK-17671:
---

 Summary: Spark 2.0 history server summary page is slow even set 
spark.history.ui.maxApplications
 Key: SPARK-17671
 URL: https://issues.apache.org/jira/browse/SPARK-17671
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.0.0
Reporter: Gang Wu


This is a subsequent task of 
[SPARK-17243|https://issues.apache.org/jira/browse/SPARK-17243]. After the fix 
of SPARK-17243 (limit the number of applications in the JSON string transferred 
from history server backend to web UI frontend), the history server does 
display the target number of history summaries. 

However, when there are more than 10k application history, it still gets slower 
and slower. The problem is in the following code:

{code:title=ApplicationListResource.scala|borderStyle=solid}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class ApplicationListResource(uiRoot: UIRoot) {

  @GET
  def appList(
  @QueryParam("status") status: JList[ApplicationStatus],
  @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
SimpleDateParam,
  @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
SimpleDateParam,
  @QueryParam("limit") limit: Integer)
  : Iterator[ApplicationInfo] = {
// although there is a limit operation in the end
// the following line still does a transformation for all history 
// in the list
val allApps = uiRoot.getApplicationInfoList

// ...
// irrelevant code is omitted 
// ...

if (limit != null) {
  appList.take(limit)
} else {
  appList
}
  }
}
{code}

What the code **uiRoot.getApplicationInfoList** does is to transform every 
application history from class ApplicationHistoryInfo to class ApplicationInfo. 
So if there are 10k applications, 10k transformations will be done even we have 
limited 5000 jobs here.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-17666:
---
Component/s: (was: Java API)
 SQL

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17671) Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications

2016-09-26 Thread Gang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524173#comment-15524173
 ] 

Gang Wu commented on SPARK-17671:
-

I'm working on this and will send a pull request soon.

> Spark 2.0 history server summary page is slow even set 
> spark.history.ui.maxApplications
> ---
>
> Key: SPARK-17671
> URL: https://issues.apache.org/jira/browse/SPARK-17671
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.0.0
>Reporter: Gang Wu
>
> This is a subsequent task of 
> [SPARK-17243|https://issues.apache.org/jira/browse/SPARK-17243]. After the 
> fix of SPARK-17243 (limit the number of applications in the JSON string 
> transferred from history server backend to web UI frontend), the history 
> server does display the target number of history summaries. 
> However, when there are more than 10k application history, it still gets 
> slower and slower. The problem is in the following code:
> {code:title=ApplicationListResource.scala|borderStyle=solid}
> @Produces(Array(MediaType.APPLICATION_JSON))
> private[v1] class ApplicationListResource(uiRoot: UIRoot) {
>   @GET
>   def appList(
>   @QueryParam("status") status: JList[ApplicationStatus],
>   @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
> SimpleDateParam,
>   @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
> SimpleDateParam,
>   @QueryParam("limit") limit: Integer)
>   : Iterator[ApplicationInfo] = {
> // although there is a limit operation in the end
> // the following line still does a transformation for all history 
> // in the list
> val allApps = uiRoot.getApplicationInfoList
> 
> // ...
> // irrelevant code is omitted 
> // ...
> if (limit != null) {
>   appList.take(limit)
> } else {
>   appList
> }
>   }
> }
> {code}
> What the code **uiRoot.getApplicationInfoList** does is to transform every 
> application history from class ApplicationHistoryInfo to class 
> ApplicationInfo. So if there are 10k applications, 10k transformations will 
> be done even we have limited 5000 jobs here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17669) Strange behavior using Datasets

2016-09-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524166#comment-15524166
 ] 

Sean Owen commented on SPARK-17669:
---

It's hard to say what is going on without knowing what you're executing. Lots 
of things changed in 2.0, and it's not inconceivable that some things actually 
go slower. 

> Strange behavior using Datasets
> ---
>
> Key: SPARK-17669
> URL: https://issues.apache.org/jira/browse/SPARK-17669
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Web UI
>Affects Versions: 2.0.0
>Reporter: Miles Crawford
>
> I recently migrated my application to Spark 2.0, and everything worked well, 
> except for one function that uses "toDS" and the ML libraries.
> This stage used to complete in 15 minutes or so on 1.6.2, and now takes 
> almost two hours.
> The UI shows very strange behavior - completed stages still being worked on, 
> concurrent work on tons of stages, including ones from downstream jobs:
> https://dl.dropboxusercontent.com/u/231152/spark.png
> The only source change I made was changing "toDF" to "toDS()" before handing 
> my RDDs to the ML libraries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17666:


Assignee: Apache Spark

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>Assignee: Apache Spark
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524108#comment-15524108
 ] 

Apache Spark commented on SPARK-17666:
--

User 'JoshRosen' has created a pull request for this issue:
https://github.com/apache/spark/pull/15245

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-17666:


Assignee: (was: Apache Spark)

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17670) Spark DataFrame/Dataset no longer supports Option[Map] in case classes

2016-09-26 Thread Daniel Williams (JIRA)
Daniel Williams created SPARK-17670:
---

 Summary: Spark DataFrame/Dataset no longer supports Option[Map] in 
case classes
 Key: SPARK-17670
 URL: https://issues.apache.org/jira/browse/SPARK-17670
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
Reporter: Daniel Williams


Upon upgrading to Spark 2.0 I discovered that previously supported case classes 
containing members of the type Option[Map] of any key/value binding, mutable or 
immutable, were no longer supported and produced an exception similar to the 
following.  Upon further testing I also noticed that Option was support for 
Seq, case classes, and primitives.  Validating unit tests included using 
spark-testing-base.

{code}
org.apache.spark.sql.AnalysisException: cannot resolve 
'wrapoption(staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface 
scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map).keyArray).array, mapobjects(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map).valueArray).array, true), ObjectType(interface 
scala.collection.immutable.Map))' due to data type mismatch: argument 1 
requires scala.collection.immutable.Map type, however, 'staticinvoke(class 
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface 
scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, 
MapObjects_loopIsNull33, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map).keyArray).array, mapobjects(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34, 
MapObjects_loopIsNull35, StringType).toString, 
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, 
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), 
StructField(sourceSystem,StringType,true), 
StructField(input,MapType(StringType,StringType,true),true)).input as 
map).valueArray).array, true)' is of scala.collection.Map type.;
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
{code}

Unit tests:

{code}
import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.scalatest.{Matchers, FunSuite}
import org.slf4j.LoggerFactory

import scala.util.{Failure, Try, Success}

case class ImmutableMapTest(data: Map[String, String])
case class MapTest(data: scala.collection.mutable.Map[String, String])
case class ImmtableWithOption(data: Option[Map[String, String]])
case class MutableWithOption(data: Option[scala.collection.mutable.Map[String, 
String]])
case class PrimWithOption(data: Option[String])
case class ArrayWithOption(data: Option[Seq[String]])

class TestOptionWithDataTypes
  extends FunSuite
with Matchers
with SharedSparkContext
with DataFrameSuiteBase {

  val logger = LoggerFactory.getLogger(classOf[TestOptionWithDataTypes])


  test("test immutable map") {
import sqlContext.implicits._
val rdd = sc.parallelize(Seq(ImmutableMapTest(Map("1"->"2"

val result = Try {
  rdd.toDF()
} match {
  case Success(e) => Option(e)
  case Failure(e) => {
logger.error(e.getMessage, e)
None
  }
}

result should not be(None)
result.get.count() should be(1)
result.get.show()
  }

  test("test mutable Map") {
import sqlContext.implicits._
val rdd = 

[jira] [Updated] (SPARK-17669) Strange behavior using Datasets

2016-09-26 Thread Miles Crawford (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Miles Crawford updated SPARK-17669:
---
Summary: Strange behavior using Datasets  (was: Strange UI behavior using 
Datasets)

> Strange behavior using Datasets
> ---
>
> Key: SPARK-17669
> URL: https://issues.apache.org/jira/browse/SPARK-17669
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Web UI
>Affects Versions: 2.0.0
>Reporter: Miles Crawford
>
> I recently migrated my application to Spark 2.0, and everything worked well, 
> except for one function that uses "toDS" and the ML libraries.
> This stage used to complete in 15 minutes or so on 1.6.2, and now takes 
> almost two hours.
> The UI shows very strange behavior - completed stages still being worked on, 
> concurrent work on tons of stages, including ones from downstream jobs:
> https://dl.dropboxusercontent.com/u/231152/spark.png
> The only source change I made was changing "toDF" to "toDS()" before handing 
> my RDDs to the ML libraries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17669) Strange UI behavior using Datasets

2016-09-26 Thread Miles Crawford (JIRA)
Miles Crawford created SPARK-17669:
--

 Summary: Strange UI behavior using Datasets
 Key: SPARK-17669
 URL: https://issues.apache.org/jira/browse/SPARK-17669
 Project: Spark
  Issue Type: Bug
  Components: SQL, Web UI
Affects Versions: 2.0.0
Reporter: Miles Crawford


I recently migrated my application to Spark 2.0, and everything worked well, 
except for one function that uses "toDS" and the ML libraries.

This stage used to complete in 15 minutes or so on 1.6.2, and now takes almost 
two hours.

The UI shows very strange behavior - completed stages still being worked on, 
concurrent work on tons of stages, including ones from downstream jobs:
https://dl.dropboxusercontent.com/u/231152/spark.png

The only source change I made was changing "toDF" to "toDS()" before handing my 
RDDs to the ML libraries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8824) Support Parquet time related logical types

2016-09-26 Thread Nate Sammons (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524073#comment-15524073
 ] 

Nate Sammons commented on SPARK-8824:
-

Any progress on these items?  Specifically TIMESTAMP_MILLIS for us would be 
great, as it would make both Spark and Presto viable for us.  If we were 
voting, I would vote for TIMESTAMP_MILLIS support first, then TIMESTAMP_MICROS.

> Support Parquet time related logical types
> --
>
> Key: SPARK-8824
> URL: https://issues.apache.org/jira/browse/SPARK-8824
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.5.0
>Reporter: Cheng Lian
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17652) Fix confusing exception message while reserving capacity

2016-09-26 Thread Yin Huai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yin Huai updated SPARK-17652:
-
Assignee: Sameer Agarwal

> Fix confusing exception message while reserving capacity
> 
>
> Key: SPARK-17652
> URL: https://issues.apache.org/jira/browse/SPARK-17652
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Sameer Agarwal
>Assignee: Sameer Agarwal
>Priority: Minor
> Fix For: 2.0.2, 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17652) Fix confusing exception message while reserving capacity

2016-09-26 Thread Yin Huai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yin Huai resolved SPARK-17652.
--
   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.2

Issue resolved by pull request 15225
[https://github.com/apache/spark/pull/15225]

> Fix confusing exception message while reserving capacity
> 
>
> Key: SPARK-17652
> URL: https://issues.apache.org/jira/browse/SPARK-17652
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Sameer Agarwal
>Priority: Minor
> Fix For: 2.0.2, 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17153) [Structured streams] readStream ignores partition columns

2016-09-26 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-17153.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

Issue resolved by pull request 14803
[https://github.com/apache/spark/pull/14803]

> [Structured streams] readStream ignores partition columns
> -
>
> Key: SPARK-17153
> URL: https://issues.apache.org/jira/browse/SPARK-17153
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Dmitri Carpov
> Fix For: 2.1.0
>
>
> When parquet files are persisted using partitions, spark's `readStream` 
> returns data with all `null`s for the partitioned columns.
> For example:
> {noformat}
> case class A(id: Int, value: Int)
> val data = spark.createDataset(Seq(
>   A(1, 1), 
>   A(2, 2), 
>   A(2, 3))
> )
> val url = "/mnt/databricks/test"
> data.write.partitionBy("id").parquet(url)
> {noformat}
> when data is read as stream:
> {noformat}
> spark.readStream.schema(spark.read.load(url).schema).parquet(url)
> {noformat}
> it reads:
> {noformat}
> id, value
> null, 1
> null, 2
> null, 3
> {noformat}
> A possible reason is `readStream` reads parquet files directly but when those 
> are stored the columns they are partitioned by are excluded from the file 
> itself. In the given example the parquet files contain `value` information 
> only since `id` is partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17668) Support representing structs with case classes and tuples in spark sql udf inputs

2016-09-26 Thread koert kuipers (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-17668:
--
Summary: Support representing structs with case classes and tuples in spark 
sql udf inputs  (was: Support case classes and tuples to represent structs in 
spark sql udfs)

> Support representing structs with case classes and tuples in spark sql udf 
> inputs
> -
>
> Key: SPARK-17668
> URL: https://issues.apache.org/jira/browse/SPARK-17668
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: koert kuipers
>Priority: Minor
>
> after having gotten used to have case classes represent complex structures in 
> Datasets, i am surprised to find out that when i work in DataFrames with udfs 
> no such magic exists, and i have to fall back to manipulating Row objects, 
> which is error prone and somewhat ugly.
> for example:
> {noformat}
> case class Person(name: String, age: Int)
> val df = Seq((Person("john", 33), 5), (Person("mike", 30), 6)).toDF("person", 
> "id")
> val df1 = df.withColumn("person", udf({ (p: Person) => p.copy(age = p.age + 
> 1) }).apply(col("person")))
> df1.printSchema
> df1.show
> {noformat}
> leads to:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to Person
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17668) Support case classes and tuples to represent structs in spark sql udfs

2016-09-26 Thread koert kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523989#comment-15523989
 ] 

koert kuipers commented on SPARK-17668:
---

original conversation is here:
https://www.mail-archive.com/user@spark.apache.org/msg57338.html

> Support case classes and tuples to represent structs in spark sql udfs
> --
>
> Key: SPARK-17668
> URL: https://issues.apache.org/jira/browse/SPARK-17668
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: koert kuipers
>Priority: Minor
>
> after having gotten used to have case classes represent complex structures in 
> Datasets, i am surprised to find out that when i work in DataFrames with udfs 
> no such magic exists, and i have to fall back to manipulating Row objects, 
> which is error prone and somewhat ugly.
> for example:
> {noformat}
> case class Person(name: String, age: Int)
> val df = Seq((Person("john", 33), 5), (Person("mike", 30), 6)).toDF("person", 
> "id")
> val df1 = df.withColumn("person", udf({ (p: Person) => p.copy(age = p.age + 
> 1) }).apply(col("person")))
> df1.printSchema
> df1.show
> {noformat}
> leads to:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to Person
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17668) Support case classes and tuples to represent structs in spark sql udfs

2016-09-26 Thread koert kuipers (JIRA)
koert kuipers created SPARK-17668:
-

 Summary: Support case classes and tuples to represent structs in 
spark sql udfs
 Key: SPARK-17668
 URL: https://issues.apache.org/jira/browse/SPARK-17668
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.0.0
Reporter: koert kuipers
Priority: Minor


after having gotten used to have case classes represent complex structures in 
Datasets, i am surprised to find out that when i work in DataFrames with udfs 
no such magic exists, and i have to fall back to manipulating Row objects, 
which is error prone and somewhat ugly.

for example:
{noformat}
case class Person(name: String, age: Int)

val df = Seq((Person("john", 33), 5), (Person("mike", 30), 6)).toDF("person", 
"id")
val df1 = df.withColumn("person", udf({ (p: Person) => p.copy(age = p.age + 1) 
}).apply(col("person")))
df1.printSchema
df1.show
{noformat}
leads to:
{noformat}
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to Person
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523967#comment-15523967
 ] 

Sean Owen commented on SPARK-17666:
---

Agree, it might happen to help some cases or even this one but can't help all 
cases. Does a finalize() help at all? only if somehow the iterator were 
regularly GCed well before the underlying stream or something, but I don't see 
how.

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17667) Make locking fine grained in YarnAllocator#enqueueGetLossReasonRequest

2016-09-26 Thread Ashwin Shankar (JIRA)
Ashwin Shankar created SPARK-17667:
--

 Summary: Make locking fine grained in 
YarnAllocator#enqueueGetLossReasonRequest
 Key: SPARK-17667
 URL: https://issues.apache.org/jira/browse/SPARK-17667
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 2.0.0, 1.6.2
Reporter: Ashwin Shankar


Following up on the discussion in SPARK-15725, one of the reason for AM hanging 
with dynamic allocation(DA) is the way locking is done in YarnAllocator. We 
noticed that when executors go down during the shrink phase of DA, AM gets 
locked up. On taking thread dump, we see threads trying to get loss for reason 
via YarnAllocator#enqueueGetLossReasonRequest, and they are all BLOCKED waiting 
for lock acquired by allocate call. This gets worse when the number of 
executors go down are in the thousands, and I've seen AM hang in the order of 
minutes. This jira is created to make the locking little more fine grained by 
remembering the executors that were killed via AM, and then serve the 
GetExecutorLossReason requests with that information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523921#comment-15523921
 ] 

Josh Rosen commented on SPARK-17666:


I think that one problem with that approach is that any transformation of a 
CompletionIterator will yield a new iterator which doesn't forward the close() 
call (i.e. if you {{map}} over the iterator then it will break your proposed 
check in {{Task}}).

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17665) SparkR does not support options in other types consistently other APIs

2016-09-26 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523844#comment-15523844
 ] 

Felix Cheung commented on SPARK-17665:
--

supporting just character and logical seem fine.
AFAIK we don't support numerical values in Spark properties (only their string 
form) - so we might not need to take the additional steps there.

> SparkR does not support options in other types consistently other APIs
> --
>
> Key: SPARK-17665
> URL: https://issues.apache.org/jira/browse/SPARK-17665
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 2.0.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> Currently, SparkR only supports a string as option in some APIs such as 
> `read.df`/`write.df` and etc.
> It'd be great if they support other types consistently with 
> Python/Scala/Java/SQL APIs.
> - Python supports all types but converts it to string
> - Scala/Java/SQL - Long/Boolean/String/Double.
> Currently, 
> {code}
> > read.df("text.json", "csv", inferSchema=FALSE)
> {code}
> throws an exception as below:
> {code}
> Error in value[[3L]](cond) :
>   Error in invokeJava(isStatic = TRUE, className, methodName, ...): 
> java.lang.ClassCastException: java.lang.Boolean cannot be cast to 
> java.lang.String
>   at 
> org.apache.spark.sql.internal.SessionState$$anonfun$newHadoopConfWithOptions$1.apply(SessionState.scala:59)
>   at 
> org.apache.spark.sql.internal.SessionState$$anonfun$newHadoopConfWithOptions$1.apply(SessionState.scala:59)
>   at scala.collection.immutable.Map$Map3.foreach(Map.scala:161)
>   at 
> org.apache.spark.sql.internal.SessionState.newHadoopConfWithOptions(SessionState.scala:59)
>   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.(PartitioningAwareFileCatalog.scala:45)
>   at 
> org.apache.spark.sql.execution.datasources.ListingFileCatalog.(ListingFileCatalog.scala:45)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:401)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>   at org.apache.spark.sql.DataFrameReader.lo
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17638) Stop JVM StreamingContext when the Python process is dead

2016-09-26 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-17638:
-
Fix Version/s: (was: 2.0.2)
   2.0.1

> Stop JVM StreamingContext when the Python process is dead
> -
>
> Key: SPARK-17638
> URL: https://issues.apache.org/jira/browse/SPARK-17638
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
> Fix For: 2.0.1, 2.1.0
>
>
> When the Python process is dead, the JVM StreamingContext is still running. 
> Hence we will see a lot of Py4jException before the JVM process exits. It's 
> better to stop the JVM StreamingContext to avoid those annoying logs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523799#comment-15523799
 ] 

Sean Owen edited comment on SPARK-17666 at 9/26/16 6:24 PM:


Hm, I wonder if a couple problems of this form could be solved by handling the 
case of a CompletionIterator in ResultTask.runTask, which is the thing that 
would eventually get a CompletionIterator from something and process it, if 
anything, in this case. It could call completion() when it knows it's done. I 
might give that a shot to see if it works at all; not sure that's the right fix.


was (Author: srowen):
Hm, I wonder if a couple problems of this form could be solved by handling the 
case of a CompletionIterator in SparkContext.runJob(), which is the thing that 
would eventually get a CompletionIterator from something and process it, if 
anything, in this case. It could call completion() when it knows it's done. I 
might give that a shot to see if it works at all; not sure that's the right fix.

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523799#comment-15523799
 ] 

Sean Owen commented on SPARK-17666:
---

Hm, I wonder if a couple problems of this form could be solved by handling the 
case of a CompletionIterator in SparkContext.runJob(), which is the thing that 
would eventually get a CompletionIterator from something and process it, if 
anything, in this case. It could call completion() when it knows it's done. I 
might give that a shot to see if it works at all; not sure that's the right fix.

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17649) Log how many Spark events got dropped in LiveListenerBus

2016-09-26 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-17649:
-
Fix Version/s: 1.6.3

> Log how many Spark events got dropped in LiveListenerBus
> 
>
> Key: SPARK-17649
> URL: https://issues.apache.org/jira/browse/SPARK-17649
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
> Fix For: 1.6.3, 2.0.2, 2.1.0
>
>
> Log how many Spark events got dropped in LiveListenerBus so that the user can 
> get insights on how to set a correct event queue size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17666) take() or isEmpty() on dataset leaks s3a connections

2016-09-26 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523742#comment-15523742
 ] 

Josh Rosen commented on SPARK-17666:


My hunch is that there's cleanup which is performed in a {{CompletionIterator}} 
once that iterator is completely consumed, but that there is not final "safety 
net" cleanup logic to ensure cleanup if the iterator is _not_ fully-consumed 
(which happens in take).

> take() or isEmpty() on dataset leaks s3a connections
> 
>
> Key: SPARK-17666
> URL: https://issues.apache.org/jira/browse/SPARK-17666
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.0.0
> Environment: ubuntu/centos, java 7, java 8, spark 2.0, java api
>Reporter: Igor Berman
>
> I'm experiensing problems with s3a and working with parquet with dataset api
> the symptom of problem - tasks failing with 
> {code}
> Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout 
> waiting for connection from pool
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> {code}
> Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in 
> CLOSE_WAIT state
> reproduction of problem:
> {code}
> package com.test;
> import java.text.ParseException;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> public class ConnectionLeakTest {
>   public static void main(String[] args) throws ParseException {
>   SparkConf sparkConf = new SparkConf();
>   sparkConf.setMaster("local[*]");
>   sparkConf.setAppName("Test");
>   sparkConf.set("spark.local.dir", "/tmp/spark");
>   sparkConf.set("spark.sql.shuffle.partitions", "2");
>   SparkSession session = 
> SparkSession.builder().config(sparkConf).getOrCreate();
> //set your credentials to your bucket
>   for (int i = 0; i < 100; i++) {
>   Dataset df = session
>   .sqlContext()
>   .read()
>   .parquet("s3a://test/*");//contains 
> multiple snappy compressed parquet files
>   if (df.rdd().isEmpty()) {//same problem with 
> takeAsList().isEmpty()
>   System.out.println("Yes");
>   } else {
>   System.out.println("No");
>   }
>   }
>   System.out.println("Done");
>   }
> }
> {code}
> so when program runs, you can jps for pid and do lsof -p  | grep https
> and you'll see constant grow of CLOSE_WAITs
> Our way to bypass problem is to use count() == 0
> In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() 
> doesn't produce problem too



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17649) Log how many Spark events got dropped in LiveListenerBus

2016-09-26 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu resolved SPARK-17649.
--
   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.2

> Log how many Spark events got dropped in LiveListenerBus
> 
>
> Key: SPARK-17649
> URL: https://issues.apache.org/jira/browse/SPARK-17649
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
> Fix For: 2.0.2, 2.1.0
>
>
> Log how many Spark events got dropped in LiveListenerBus so that the user can 
> get insights on how to set a correct event queue size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17454) Use Mesos disk resources

2016-09-26 Thread Michael Gummelt (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Gummelt updated SPARK-17454:

Summary: Use Mesos disk resources  (was: Add option to specify Mesos 
resource offer constraints)

> Use Mesos disk resources
> 
>
> Key: SPARK-17454
> URL: https://issues.apache.org/jira/browse/SPARK-17454
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.0.0
>Reporter: Chris Bannister
>
> Currently the driver will accept offers from Mesos which have enough ram for 
> the executor and until its max cores is reached. There is no way to control 
> the required CPU's or disk for each executor, it would be very useful to be 
> able to apply something similar to spark.mesos.constraints to resource offers 
> instead of attributes on the offer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17647) SQL LIKE does not handle backslashes correctly

2016-09-26 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523623#comment-15523623
 ] 

Xiangrui Meng edited comment on SPARK-17647 at 9/26/16 5:07 PM:


Thanks [~joshrosen]! I updated the JIRA description. The LIKE escaping 
behaviors in MySQL/PostgreSQL are documented here:

* MySQL: 
http://dev.mysql.com/doc/refman/5.7/en/string-comparison-functions.html#operator_like
* PostgreSQL: https://www.postgresql.org/docs/8.3/static/functions-matching.html

In particular, MySQL:

{noformat}
Exception: At the end of the pattern string, backslash can be specified as “\\”.
At the end of the string, backslash stands for itself because there is nothing 
following to escape.
{noformat}

That explains why MySQL returns true for both

{code}
'\\' like ''
'\\' like '\\'
{code}


was (Author: mengxr):
Thanks [~joshrosen]! I updated the JIRA description. The LIKE escaping 
behaviors in MySQL/PostgreSQL are documented here:

* MySQL: 
http://dev.mysql.com/doc/refman/5.7/en/string-comparison-functions.html#operator_like
* PostgreSQL: https://www.postgresql.org/docs/8.3/static/functions-matching.html

In particular, MySQL:

{noformat}
Exception: At the end of the pattern string, backslash can be specified as “\\”.
At the end of the string, backslash stands for itself because there is nothing 
following to escape.
{noformat}

That explains why MySQL returns true for both `\\` like `` and `\\` like 
`\\`.

> SQL LIKE does not handle backslashes correctly
> --
>
> Key: SPARK-17647
> URL: https://issues.apache.org/jira/browse/SPARK-17647
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Reporter: Xiangrui Meng
>  Labels: correctness
>
> Try the following in SQL shell:
> {code}
> select '' like '%\\%';
> {code}
> It returned false, which is wrong.
> cc: [~yhuai] [~joshrosen]
> A false-negative considered previously:
> {code}
> select '' rlike '.*.*';
> {code}
> It returned true, which is correct if we assume that the pattern is treated 
> as a Java string but not raw string.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >