[jira] [Updated] (SPARK-37061) Custom V2 Metrics uses wrong classname for lookup

2021-10-19 Thread Russell Spitzer (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-37061:

Description: 
Currently CustomMetrics uses `getCanonicalName` to get the metric type name

https://github.com/apache/spark/blob/38493401d18d42a6cb176bf515536af97ba1338b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala#L31-L33

But when using reflection we need to use the original type name.

Here is an example when working with an inner class

{code:java title="CanonicalName vs Name"}
Class.getName = 
org.apache.iceberg.spark.source.SparkBatchScan$FilesScannedMetric
Class.getCanonicalName =
org.apache.iceberg.spark.source.SparkBatchScan.FilesScannedMetric
{code}


The "$" name is required to look up this class while the "." version will fail 
with CNF.

  was:
Currently CustomMetrics uses `getCanonicalName` to get the metric type name

https://github.com/apache/spark/blob/38493401d18d42a6cb176bf515536af97ba1338b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala#L31-L33

But when using reflection we need to use the original type name.

Here is an example when working with an inner class

```
Class.getName = 
org.apache.iceberg.spark.source.SparkBatchScan$FilesScannedMetric
Class.getCanonicalName =
org.apache.iceberg.spark.source.SparkBatchScan.FilesScannedMetric
```

The "$" name is required to look up this class while the "." version will fail 
with CNF.


> Custom V2 Metrics uses wrong classname for lookup
> -
>
> Key: SPARK-37061
> URL: https://issues.apache.org/jira/browse/SPARK-37061
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Currently CustomMetrics uses `getCanonicalName` to get the metric type name
> https://github.com/apache/spark/blob/38493401d18d42a6cb176bf515536af97ba1338b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala#L31-L33
> But when using reflection we need to use the original type name.
> Here is an example when working with an inner class
> {code:java title="CanonicalName vs Name"}
> Class.getName = 
> org.apache.iceberg.spark.source.SparkBatchScan$FilesScannedMetric
> Class.getCanonicalName =
> org.apache.iceberg.spark.source.SparkBatchScan.FilesScannedMetric
> {code}
> The "$" name is required to look up this class while the "." version will 
> fail with CNF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37061) Custom V2 Metrics uses wrong classname in for lookup

2021-10-19 Thread Russell Spitzer (Jira)
Russell Spitzer created SPARK-37061:
---

 Summary: Custom V2 Metrics uses wrong classname in for lookup
 Key: SPARK-37061
 URL: https://issues.apache.org/jira/browse/SPARK-37061
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Russell Spitzer


Currently CustomMetrics uses `getCanonicalName` to get the metric type name

https://github.com/apache/spark/blob/38493401d18d42a6cb176bf515536af97ba1338b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala#L31-L33

But when using reflection we need to use the original type name.

Here is an example when working with an inner class

```
Class.getName = 
org.apache.iceberg.spark.source.SparkBatchScan$FilesScannedMetric
Class.getCanonicalName =
org.apache.iceberg.spark.source.SparkBatchScan.FilesScannedMetric
```

The "$" name is required to look up this class while the "." version will fail 
with CNF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37061) Custom V2 Metrics uses wrong classname for lookup

2021-10-19 Thread Russell Spitzer (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-37061:

Summary: Custom V2 Metrics uses wrong classname for lookup  (was: Custom V2 
Metrics uses wrong classname in for lookup)

> Custom V2 Metrics uses wrong classname for lookup
> -
>
> Key: SPARK-37061
> URL: https://issues.apache.org/jira/browse/SPARK-37061
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Currently CustomMetrics uses `getCanonicalName` to get the metric type name
> https://github.com/apache/spark/blob/38493401d18d42a6cb176bf515536af97ba1338b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala#L31-L33
> But when using reflection we need to use the original type name.
> Here is an example when working with an inner class
> ```
> Class.getName = 
> org.apache.iceberg.spark.source.SparkBatchScan$FilesScannedMetric
> Class.getCanonicalName =
> org.apache.iceberg.spark.source.SparkBatchScan.FilesScannedMetric
> ```
> The "$" name is required to look up this class while the "." version will 
> fail with CNF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33041) Better error messages when PySpark Java Gateway Crashes

2020-09-30 Thread Russell Spitzer (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17205059#comment-17205059
 ] 

Russell Spitzer commented on SPARK-33041:
-

To elaborate, this could be the case for any failure that occurs after the 
connection file is written. For example, say the OOM killer comes in and shuts 
down the gateway or there is some other fatal failure of the gateway. These 
cases would also result in the rather opaque messages about queues and 
networking, when we know the actual problem is that the gateway is shutdown.

> Better error messages when PySpark Java Gateway Crashes
> ---
>
> Key: SPARK-33041
> URL: https://issues.apache.org/jira/browse/SPARK-33041
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.7, 3.0.1
>Reporter: Russell Spitzer
>Priority: Major
>
> Currently the startup works by opening the Gateway process and waiting until 
> the the process has written the conn_info_file. Once the conn_file is written 
> it proceeds to attempt to connect to the port.
> This connection can succeed and the process can start normally, but if the 
> gateway process dies or is killed the error that the user ends up getting is 
> a confusing "connection_failed" style error like
> {code}
> Traceback (most recent call last):
>   File 
> "/usr/lib/spark-packages/spark2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 929, in _get_connection
> connection = self.deque.pop()
> IndexError: pop from an empty deque
> {code}
> Since we have a handle on the py4j process, we should probably check whether 
> it has terminated before surfacing any exceptions like this. 
> CC [~holden]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33041) Better error messages when PySpark Java Gateway Crashes

2020-09-30 Thread Russell Spitzer (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-33041:

Affects Version/s: 3.0.1

> Better error messages when PySpark Java Gateway Crashes
> ---
>
> Key: SPARK-33041
> URL: https://issues.apache.org/jira/browse/SPARK-33041
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.7, 3.0.1
>Reporter: Russell Spitzer
>Priority: Major
>
> Currently the startup works by opening the Gateway process and waiting until 
> the the process has written the conn_info_file. Once the conn_file is written 
> it proceeds to attempt to connect to the port.
> This connection can succeed and the process can start normally, but if the 
> gateway process dies or is killed the error that the user ends up getting is 
> a confusing "connection_failed" style error like
> {code}
> Traceback (most recent call last):
>   File 
> "/usr/lib/spark-packages/spark2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 929, in _get_connection
> connection = self.deque.pop()
> IndexError: pop from an empty deque
> {code}
> Since we have a handle on the py4j process, we should probably check whether 
> it has terminated before surfacing any exceptions like this. 
> CC [~holden]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33041) Better error messages when PySpark Java Gateway Crashes

2020-09-30 Thread Russell Spitzer (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-33041:

Summary: Better error messages when PySpark Java Gateway Crashes  (was: 
Better error messages when PySpark Java Gateway Fails to Start or Crashes)

> Better error messages when PySpark Java Gateway Crashes
> ---
>
> Key: SPARK-33041
> URL: https://issues.apache.org/jira/browse/SPARK-33041
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.4.7
>Reporter: Russell Spitzer
>Priority: Major
>
> Currently the startup works by opening the Gateway process and waiting until 
> the the process has written the conn_info_file. Once the conn_file is written 
> it proceeds to attempt to connect to the port.
> This connection can succeed and the process can start normally, but if the 
> gateway process dies or is killed the error that the user ends up getting is 
> a confusing "connection_failed" style error like
> {code}
> Traceback (most recent call last):
>   File 
> "/usr/lib/spark-packages/spark2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 929, in _get_connection
> connection = self.deque.pop()
> IndexError: pop from an empty deque
> {code}
> Since we have a handle on the py4j process, we should probably check whether 
> it has terminated before surfacing any exceptions like this. 
> CC [~holden]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33041) Better error messages when PySpark Java Gateway Fails to Start or Crashes

2020-09-30 Thread Russell Spitzer (Jira)
Russell Spitzer created SPARK-33041:
---

 Summary: Better error messages when PySpark Java Gateway Fails to 
Start or Crashes
 Key: SPARK-33041
 URL: https://issues.apache.org/jira/browse/SPARK-33041
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 2.4.7
Reporter: Russell Spitzer


Currently the startup works by opening the Gateway process and waiting until 
the the process has written the conn_info_file. Once the conn_file is written 
it proceeds to attempt to connect to the port.

This connection can succeed and the process can start normally, but if the 
gateway process dies or is killed the error that the user ends up getting is a 
confusing "connection_failed" style error like

{code}
Traceback (most recent call last):
  File 
"/usr/lib/spark-packages/spark2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
 line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
{code}

Since we have a handle on the py4j process, we should probably check whether it 
has terminated before surfacing any exceptions like this. 

CC [~holden]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32977) [SQL] JavaDoc on Default Save mode Incorrect

2020-09-23 Thread Russell Spitzer (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200930#comment-17200930
 ] 

Russell Spitzer commented on SPARK-32977:
-

[~brkyvz] We talked about this a while back, just submitted the PR to fix the 
doc. Could you please review?

> [SQL] JavaDoc on Default Save mode Incorrect
> 
>
> Key: SPARK-32977
> URL: https://issues.apache.org/jira/browse/SPARK-32977
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1
>Reporter: Russell Spitzer
>Priority: Major
>
> The JavaDoc says that the default save mode is dependent on DataSource 
> version which is incorrect. It is always ErrorOnExists.
> http://apache-spark-developers-list.1001551.n3.nabble.com/DatasourceV2-Default-Mode-for-DataFrameWriter-not-Dependent-on-DataSource-Version-td29434.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32977) [SQL] JavaDoc on Default Save mode Incorrect

2020-09-23 Thread Russell Spitzer (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-32977:

Description: 
The JavaDoc says that the default save mode is dependent on DataSource version 
which is incorrect. It is always ErrorOnExists.

http://apache-spark-developers-list.1001551.n3.nabble.com/DatasourceV2-Default-Mode-for-DataFrameWriter-not-Dependent-on-DataSource-Version-td29434.html

  was:The JavaDoc says that the default save mode is dependent on DataSource 
version which is incorrect. It is always ErrorOnExists.


> [SQL] JavaDoc on Default Save mode Incorrect
> 
>
> Key: SPARK-32977
> URL: https://issues.apache.org/jira/browse/SPARK-32977
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1
>Reporter: Russell Spitzer
>Priority: Major
>
> The JavaDoc says that the default save mode is dependent on DataSource 
> version which is incorrect. It is always ErrorOnExists.
> http://apache-spark-developers-list.1001551.n3.nabble.com/DatasourceV2-Default-Mode-for-DataFrameWriter-not-Dependent-on-DataSource-Version-td29434.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32977) [SQL] JavaDoc on Default Save mode Incorrect

2020-09-23 Thread Russell Spitzer (Jira)
Russell Spitzer created SPARK-32977:
---

 Summary: [SQL] JavaDoc on Default Save mode Incorrect
 Key: SPARK-32977
 URL: https://issues.apache.org/jira/browse/SPARK-32977
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.1
Reporter: Russell Spitzer


The JavaDoc says that the default save mode is dependent on DataSource version 
which is incorrect. It is always ErrorOnExists.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25003) Pyspark Does not use Spark Sql Extensions

2019-04-01 Thread Russell Spitzer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16807121#comment-16807121
 ] 

Russell Spitzer edited comment on SPARK-25003 at 4/1/19 7:48 PM:
-

There was no interest in putting in OSS 2.4 and 2.2, but I did do this backport 
for the Datastax Distribution of Spark 2.4 and I can report it is a relatively 
simple and straightforward process.


was (Author: rspitzer):
There was no interest in putting in OSS 2.4, but I did do this backport for the 
Datastax Distribution of Spark 2.4 and I can report it is a relatively simple 
and straightforward process.

> Pyspark Does not use Spark Sql Extensions
> -
>
> Key: SPARK-25003
> URL: https://issues.apache.org/jira/browse/SPARK-25003
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.2, 2.3.1
>Reporter: Russell Spitzer
>Assignee: Russell Spitzer
>Priority: Major
> Fix For: 3.0.0
>
>
> When creating a SparkSession here
> [https://github.com/apache/spark/blob/v2.2.2/python/pyspark/sql/session.py#L216]
> {code:python}
> if jsparkSession is None:
>   jsparkSession = self._jvm.SparkSession(self._jsc.sc())
> self._jsparkSession = jsparkSession
> {code}
> I believe it ends up calling the constructor here
> https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L85-L87
> {code:scala}
>   private[sql] def this(sc: SparkContext) {
> this(sc, None, None, new SparkSessionExtensions)
>   }
> {code}
> Which creates a new SparkSessionsExtensions object and does not pick up new 
> extensions that could have been set in the config like the companion 
> getOrCreate does.
> https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L928-L944
> {code:scala}
> //in getOrCreate
> // Initialize extensions if the user has defined a configurator class.
> val extensionConfOption = 
> sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
> if (extensionConfOption.isDefined) {
>   val extensionConfClassName = extensionConfOption.get
>   try {
> val extensionConfClass = 
> Utils.classForName(extensionConfClassName)
> val extensionConf = extensionConfClass.newInstance()
>   .asInstanceOf[SparkSessionExtensions => Unit]
> extensionConf(extensions)
>   } catch {
> // Ignore the error if we cannot find the class or when the class 
> has the wrong type.
> case e @ (_: ClassCastException |
>   _: ClassNotFoundException |
>   _: NoClassDefFoundError) =>
>   logWarning(s"Cannot use $extensionConfClassName to configure 
> session extensions.", e)
>   }
> }
> {code}
> I think a quick fix would be to use the getOrCreate method from the companion 
> object instead of calling the constructor from the SparkContext. Or we could 
> fix this by ensuring that all constructors attempt to pick up custom 
> extensions if they are set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25003) Pyspark Does not use Spark Sql Extensions

2019-04-01 Thread Russell Spitzer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16807121#comment-16807121
 ] 

Russell Spitzer commented on SPARK-25003:
-

There was no interest in putting in OSS 2.4, but I did do this backport for the 
Datastax Distribution of Spark 2.4 and I can report it is a relatively simple 
and straightforward process.

> Pyspark Does not use Spark Sql Extensions
> -
>
> Key: SPARK-25003
> URL: https://issues.apache.org/jira/browse/SPARK-25003
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.2, 2.3.1
>Reporter: Russell Spitzer
>Assignee: Russell Spitzer
>Priority: Major
> Fix For: 3.0.0
>
>
> When creating a SparkSession here
> [https://github.com/apache/spark/blob/v2.2.2/python/pyspark/sql/session.py#L216]
> {code:python}
> if jsparkSession is None:
>   jsparkSession = self._jvm.SparkSession(self._jsc.sc())
> self._jsparkSession = jsparkSession
> {code}
> I believe it ends up calling the constructor here
> https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L85-L87
> {code:scala}
>   private[sql] def this(sc: SparkContext) {
> this(sc, None, None, new SparkSessionExtensions)
>   }
> {code}
> Which creates a new SparkSessionsExtensions object and does not pick up new 
> extensions that could have been set in the config like the companion 
> getOrCreate does.
> https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L928-L944
> {code:scala}
> //in getOrCreate
> // Initialize extensions if the user has defined a configurator class.
> val extensionConfOption = 
> sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
> if (extensionConfOption.isDefined) {
>   val extensionConfClassName = extensionConfOption.get
>   try {
> val extensionConfClass = 
> Utils.classForName(extensionConfClassName)
> val extensionConf = extensionConfClass.newInstance()
>   .asInstanceOf[SparkSessionExtensions => Unit]
> extensionConf(extensions)
>   } catch {
> // Ignore the error if we cannot find the class or when the class 
> has the wrong type.
> case e @ (_: ClassCastException |
>   _: ClassNotFoundException |
>   _: NoClassDefFoundError) =>
>   logWarning(s"Cannot use $extensionConfClassName to configure 
> session extensions.", e)
>   }
> }
> {code}
> I think a quick fix would be to use the getOrCreate method from the companion 
> object instead of calling the constructor from the SparkContext. Or we could 
> fix this by ensuring that all constructors attempt to pick up custom 
> extensions if they are set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26518) UI Application Info Race Condition Can Throw NoSuchElement

2019-01-07 Thread Russell Spitzer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736091#comment-16736091
 ] 

Russell Spitzer commented on SPARK-26518:
-

Yeah I basically came to the same conclusion, there is no easy way to instead 
throw up a "Now loading" message with the current architecture. I think it's 
probably fine to leave it as is, we only noticed this because one of our tests 
was specifically expecting either "Not Available" or 200. Since this throws up 
a server side error instead we took note.

> UI Application Info Race Condition Can Throw NoSuchElement
> --
>
> Key: SPARK-26518
> URL: https://issues.apache.org/jira/browse/SPARK-26518
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Russell Spitzer
>Priority: Trivial
>
> There is a slight race condition in the 
> [AppStatusStore|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala#L39]
> Which calls `next` on the returned store even if it is empty which i can be 
> for a short period of time after the UI is up but before the store is 
> populated.
> {code}
> 
> 
> Error 500 Server Error
> 
> HTTP ERROR 500
> Problem accessing /jobs/. Reason:
> Server ErrorCaused 
> by:java.util.NoSuchElementException
> at java.util.Collections$EmptyIterator.next(Collections.java:4189)
> at 
> org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
> at 
> org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
> at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:275)
> at org.apache.spark.ui.WebUI$$anonfun$3.apply(WebUI.scala:86)
> at org.apache.spark.ui.WebUI$$anonfun$3.apply(WebUI.scala:86)
> at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> at 
> org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
> at 
> org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
> at 
> org.spark_project.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
> at 
> org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
> at 
> org.spark_project.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
> at 
> org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
> at 
> org.spark_project.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
> at 
> org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
> at 
> org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
> at 
> org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:724)
> at 
> org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:219)
> at 
> org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> at org.spark_project.jetty.server.Server.handle(Server.java:531)
> at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:352)
> at 
> org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
> at 
> org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
> at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:102)
> at 
> org.spark_project.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
> at 
> org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
> at 
> org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26518) UI Application Info Race Condition Can Throw NoSuchElement

2019-01-02 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-26518:
---

 Summary: UI Application Info Race Condition Can Throw NoSuchElement
 Key: SPARK-26518
 URL: https://issues.apache.org/jira/browse/SPARK-26518
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.4.0, 2.3.0
Reporter: Russell Spitzer


There is a slight race condition in the 
[AppStatusStore|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala#L39]

Which calls `next` on the returned store even if it is empty which i can be for 
a short period of time after the UI is up but before the store is populated.

{code}


Error 500 Server Error

HTTP ERROR 500
Problem accessing /jobs/. Reason:
Server ErrorCaused 
by:java.util.NoSuchElementException
at java.util.Collections$EmptyIterator.next(Collections.java:4189)
at 
org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
at 
org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:275)
at org.apache.spark.ui.WebUI$$anonfun$3.apply(WebUI.scala:86)
at org.apache.spark.ui.WebUI$$anonfun$3.apply(WebUI.scala:86)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
at 
org.spark_project.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at 
org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
at 
org.spark_project.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at 
org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at 
org.spark_project.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at 
org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
at 
org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at 
org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:724)
at 
org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:219)
at 
org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.spark_project.jetty.server.Server.handle(Server.java:531)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:352)
at 
org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
at 
org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:102)
at 
org.spark_project.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25560) Allow Function Injection in SparkSessionExtensions

2018-09-27 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-25560:
---

 Summary: Allow Function Injection in SparkSessionExtensions
 Key: SPARK-25560
 URL: https://issues.apache.org/jira/browse/SPARK-25560
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core, SQL
Affects Versions: 2.4.0
Reporter: Russell Spitzer


Currently there is no way to add a set of external functions to all sessions 
made by users. We could add a small extension to SparkSessionExtensions which 
would allow this to be done. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25003) Pyspark Does not use Spark Sql Extensions

2018-08-03 Thread Russell Spitzer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568415#comment-16568415
 ] 

Russell Spitzer commented on SPARK-25003:
-

[~holden.karau] , Wrote up a PR for each branch target since I'm not sure what 
version you would think best for the update. Please let me know if you have any 
feedback or advice on how to get an automatic test in :)

> Pyspark Does not use Spark Sql Extensions
> -
>
> Key: SPARK-25003
> URL: https://issues.apache.org/jira/browse/SPARK-25003
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.2, 2.3.1
>Reporter: Russell Spitzer
>Priority: Major
>
> When creating a SparkSession here
> [https://github.com/apache/spark/blob/v2.2.2/python/pyspark/sql/session.py#L216]
> {code:python}
> if jsparkSession is None:
>   jsparkSession = self._jvm.SparkSession(self._jsc.sc())
> self._jsparkSession = jsparkSession
> {code}
> I believe it ends up calling the constructor here
> https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L85-L87
> {code:scala}
>   private[sql] def this(sc: SparkContext) {
> this(sc, None, None, new SparkSessionExtensions)
>   }
> {code}
> Which creates a new SparkSessionsExtensions object and does not pick up new 
> extensions that could have been set in the config like the companion 
> getOrCreate does.
> https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L928-L944
> {code:scala}
> //in getOrCreate
> // Initialize extensions if the user has defined a configurator class.
> val extensionConfOption = 
> sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
> if (extensionConfOption.isDefined) {
>   val extensionConfClassName = extensionConfOption.get
>   try {
> val extensionConfClass = 
> Utils.classForName(extensionConfClassName)
> val extensionConf = extensionConfClass.newInstance()
>   .asInstanceOf[SparkSessionExtensions => Unit]
> extensionConf(extensions)
>   } catch {
> // Ignore the error if we cannot find the class or when the class 
> has the wrong type.
> case e @ (_: ClassCastException |
>   _: ClassNotFoundException |
>   _: NoClassDefFoundError) =>
>   logWarning(s"Cannot use $extensionConfClassName to configure 
> session extensions.", e)
>   }
> }
> {code}
> I think a quick fix would be to use the getOrCreate method from the companion 
> object instead of calling the constructor from the SparkContext. Or we could 
> fix this by ensuring that all constructors attempt to pick up custom 
> extensions if they are set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25003) Pyspark Does not use Spark Sql Extensions

2018-08-02 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-25003:
---

 Summary: Pyspark Does not use Spark Sql Extensions
 Key: SPARK-25003
 URL: https://issues.apache.org/jira/browse/SPARK-25003
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.3.1, 2.2.2
Reporter: Russell Spitzer


When creating a SparkSession here

[https://github.com/apache/spark/blob/v2.2.2/python/pyspark/sql/session.py#L216]
{code:python}
if jsparkSession is None:
  jsparkSession = self._jvm.SparkSession(self._jsc.sc())
self._jsparkSession = jsparkSession
{code}

I believe it ends up calling the constructor here
https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L85-L87
{code:scala}
  private[sql] def this(sc: SparkContext) {
this(sc, None, None, new SparkSessionExtensions)
  }
{code}

Which creates a new SparkSessionsExtensions object and does not pick up new 
extensions that could have been set in the config like the companion 
getOrCreate does.
https://github.com/apache/spark/blob/v2.2.2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L928-L944
{code:scala}
//in getOrCreate
// Initialize extensions if the user has defined a configurator class.
val extensionConfOption = 
sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
if (extensionConfOption.isDefined) {
  val extensionConfClassName = extensionConfOption.get
  try {
val extensionConfClass = Utils.classForName(extensionConfClassName)
val extensionConf = extensionConfClass.newInstance()
  .asInstanceOf[SparkSessionExtensions => Unit]
extensionConf(extensions)
  } catch {
// Ignore the error if we cannot find the class or when the class 
has the wrong type.
case e @ (_: ClassCastException |
  _: ClassNotFoundException |
  _: NoClassDefFoundError) =>
  logWarning(s"Cannot use $extensionConfClassName to configure 
session extensions.", e)
  }
}
{code}

I think a quick fix would be to use the getOrCreate method from the companion 
object instead of calling the constructor from the SparkContext. Or we could 
fix this by ensuring that all constructors attempt to pick up custom extensions 
if they are set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21216) Streaming DataFrames fail to join with Hive tables

2018-07-27 Thread Russell Spitzer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560034#comment-16560034
 ] 

Russell Spitzer commented on SPARK-21216:
-

For anyone else searching, this also fixes custom Spark Strategies added via 
spark.sql.extensions not being applied in a Structured Streaming Context.

> Streaming DataFrames fail to join with Hive tables
> --
>
> Key: SPARK-21216
> URL: https://issues.apache.org/jira/browse/SPARK-21216
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.1
>Reporter: Burak Yavuz
>Assignee: Burak Yavuz
>Priority: Major
> Fix For: 2.3.0
>
>
> The following code will throw a cryptic exception:
> {code}
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import testImplicits._
> implicit val _sqlContext = spark.sqlContext
> Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", 
> "word").createOrReplaceTempView("t1")
> // Make a table and ensure it will be broadcast.
> sql("""CREATE TABLE smallTable(word string, number int)
>   |ROW FORMAT SERDE 
> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
>   |STORED AS TEXTFILE
> """.stripMargin)
> sql(
>   """INSERT INTO smallTable
> |SELECT word, number from t1
>   """.stripMargin)
> val inputData = MemoryStream[Int]
> val joined = inputData.toDS().toDF()
>   .join(spark.table("smallTable"), $"value" === $"number")
> val sq = joined.writeStream
>   .format("memory")
>   .queryName("t2")
>   .start()
> try {
>   inputData.addData(1, 2)
>   sq.processAllAvailable()
> } finally {
>   sq.stop()
> }
> {code}
> If someone creates a HiveSession, the planner in `IncrementalExecution` 
> doesn't take into account the Hive scan strategies



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22976) Worker cleanup can remove running driver directories

2018-01-08 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317367#comment-16317367
 ] 

Russell Spitzer commented on SPARK-22976:
-

Made a PR against 2.0 but it's valid against all versions up to master

> Worker cleanup can remove running driver directories
> 
>
> Key: SPARK-22976
> URL: https://issues.apache.org/jira/browse/SPARK-22976
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 1.0.2
>Reporter: Russell Spitzer
>
> Spark Standalone worker cleanup finds directories to remove with a listFiles 
> command
> This includes both application directories and driver directories from 
> cluster mode submitted applications. 
> A directory is considered to not be part of a running app if the worker does 
> not have an executor with a matching ID.
> https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L432
> {code}
>   val appIds = executors.values.map(_.appId).toSet
>   val isAppStillRunning = appIds.contains(appIdFromDir)
> {code}
> If a driver has been started on a node, but all of the executors are on other 
> nodes, the worker running the driver will always assume that the driver 
> directory is not part of a running app.
> Consider a two node spark cluster with Worker A and Worker B where each node 
> has a single core available. We submit our application in deploy mode 
> cluster, the driver begins running on Worker A while the Executor starts on B.
> Worker A has a cleanup triggered and looks and finds it has a directory
> {code}
> /var/lib/spark/worker/driver-20180105234824-
> {code}
> Worker A check's it's executor list and finds no entries which match this 
> since it has no corresponding executors for this application. Worker A then 
> removes the directory even though it may still be actively running.
> I think this could be fixed by modifying line 432 to be
> {code}
>   val appIds = executors.values.map(_.appId).toSet ++ 
> drivers.values.map(_.driverId)
> {code}
> I'll run a test and submit a PR soon.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22976) Worker cleanup can remove running driver directories

2018-01-05 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-22976:

Description: 
Spark Standalone worker cleanup finds directories to remove with a listFiles 
command

This includes both application directories and driver directories from cluster 
mode submitted applications. 

A directory is considered to not be part of a running app if the worker does 
not have an executor with a matching ID.

https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L432
{code}
  val appIds = executors.values.map(_.appId).toSet
  val isAppStillRunning = appIds.contains(appIdFromDir)
{code}


If a driver has been started on a node, but all of the executors are on other 
nodes, the worker running the driver will always assume that the driver 
directory is not part of a running app.

Consider a two node spark cluster with Worker A and Worker B where each node 
has a single core available. We submit our application in deploy mode cluster, 
the driver begins running on Worker A while the Executor starts on B.

Worker A has a cleanup triggered and looks and finds it has a directory
{code}
/var/lib/spark/worker/driver-20180105234824-
{code}

Worker A check's it's executor list and finds no entries which match this since 
it has no corresponding executors for this application. Worker A then removes 
the directory even though it may still be actively running.

I think this could be fixed by modifying line 432 to be
{code}
  val appIds = executors.values.map(_.appId).toSet ++ 
drivers.values.map(_.driverId)
{code}

I'll run a test and submit a PR soon.


  was:
Spark Standalone worker cleanup finds directories to remove with a listFiles 
command

This includes both application directories and driver directories from cluster 
mode submitted applications. 

A directory is considered to not be part of a running app if the worker does 
not have an executor with a matching ID.

https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L432
{code}
  val appIds = executors.values.map(_.appId).toSet
  val isAppStillRunning = appIds.contains(appIdFromDir)
{code}


If a driver has been started on a node but all of the executors are on other 
workers the worker will always assume that the driver directory is not-running.

Consider a two node spark cluster with Worker A and Worker B where each node 
has a single core available. We submit our application in deploy mode cluster, 
the driver begins running on Worker A while the Executor starts on B.

Worker A has a cleanup triggered and looks and finds it has a directory
{code}
/var/lib/spark/worker/driver-20180105234824-
{code}

Worker A check's it's executor list and finds no entries which match this since 
it has no corresponding executors for this application. Worker A then removes 
the directory even though it may still be actively running.

I think this could be fixed by modifying line 432 to be
{code}
  val appIds = executors.values.map(_.appId).toSet ++ 
drivers.values.map(_.driverId)
{code}

I'll run a test and submit a PR soon.



> Worker cleanup can remove running driver directories
> 
>
> Key: SPARK-22976
> URL: https://issues.apache.org/jira/browse/SPARK-22976
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 1.0.2
>Reporter: Russell Spitzer
>
> Spark Standalone worker cleanup finds directories to remove with a listFiles 
> command
> This includes both application directories and driver directories from 
> cluster mode submitted applications. 
> A directory is considered to not be part of a running app if the worker does 
> not have an executor with a matching ID.
> https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L432
> {code}
>   val appIds = executors.values.map(_.appId).toSet
>   val isAppStillRunning = appIds.contains(appIdFromDir)
> {code}
> If a driver has been started on a node, but all of the executors are on other 
> nodes, the worker running the driver will always assume that the driver 
> directory is not part of a running app.
> Consider a two node spark cluster with Worker A and Worker B where each node 
> has a single core available. We submit our application in deploy mode 
> cluster, the driver begins running on Worker A while the Executor starts on B.
> Worker A has a cleanup triggered and looks and finds it has a directory
> {code}
> /var/lib/spark/worker/driver-20180105234824-
> {code}
> Worker A check's it's executor list and finds no entries which match this 
> since it has no corresponding executors for this application. Worker A then 
> removes the 

[jira] [Created] (SPARK-22976) Workerer cleanup can remove running driver directories

2018-01-05 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-22976:
---

 Summary: Workerer cleanup can remove running driver directories
 Key: SPARK-22976
 URL: https://issues.apache.org/jira/browse/SPARK-22976
 Project: Spark
  Issue Type: Bug
  Components: Deploy, Spark Core
Affects Versions: 1.0.2
Reporter: Russell Spitzer


Spark Standalone worker cleanup finds directories to remove with a listFiles 
command

This includes both application directories and driver directories from cluster 
mode submitted applications. 

A directory is considered to not be part of a running app if the worker does 
not have an executor with a matching ID.

https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L432
{code}
  val appIds = executors.values.map(_.appId).toSet
  val isAppStillRunning = appIds.contains(appIdFromDir)
{code}


If a driver has been started on a node but all of the executors are on other 
workers the worker will always assume that the driver directory is not-running.

Consider a two node spark cluster with Worker A and Worker B where each node 
has a single core available. We submit our application in deploy mode cluster, 
the driver begins running on Worker A while the Executor starts on B.

Worker A has a cleanup triggered and looks and finds it has a directory
{code}
/var/lib/spark/worker/driver-20180105234824-
{code}

Worker A check's it's executor list and finds no entries which match this since 
it has no corresponding executors for this application. Worker A then removes 
the directory even though it may still be actively running.

I think this could be fixed by modifying line 432 to be
{code}
  val appIds = executors.values.map(_.appId).toSet ++ 
drivers.values.map(_.driverId)
{code}

I'll run a test and submit a PR soon.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22976) Worker cleanup can remove running driver directories

2018-01-05 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-22976:

Summary: Worker cleanup can remove running driver directories  (was: 
Workerer cleanup can remove running driver directories)

> Worker cleanup can remove running driver directories
> 
>
> Key: SPARK-22976
> URL: https://issues.apache.org/jira/browse/SPARK-22976
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Spark Core
>Affects Versions: 1.0.2
>Reporter: Russell Spitzer
>
> Spark Standalone worker cleanup finds directories to remove with a listFiles 
> command
> This includes both application directories and driver directories from 
> cluster mode submitted applications. 
> A directory is considered to not be part of a running app if the worker does 
> not have an executor with a matching ID.
> https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L432
> {code}
>   val appIds = executors.values.map(_.appId).toSet
>   val isAppStillRunning = appIds.contains(appIdFromDir)
> {code}
> If a driver has been started on a node but all of the executors are on other 
> workers the worker will always assume that the driver directory is 
> not-running.
> Consider a two node spark cluster with Worker A and Worker B where each node 
> has a single core available. We submit our application in deploy mode 
> cluster, the driver begins running on Worker A while the Executor starts on B.
> Worker A has a cleanup triggered and looks and finds it has a directory
> {code}
> /var/lib/spark/worker/driver-20180105234824-
> {code}
> Worker A check's it's executor list and finds no entries which match this 
> since it has no corresponding executors for this application. Worker A then 
> removes the directory even though it may still be actively running.
> I think this could be fixed by modifying line 432 to be
> {code}
>   val appIds = executors.values.map(_.appId).toSet ++ 
> drivers.values.map(_.driverId)
> {code}
> I'll run a test and submit a PR soon.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15689) Data source API v2

2017-11-01 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234899#comment-16234899
 ] 

Russell Spitzer edited comment on SPARK-15689 at 11/1/17 10:52 PM:
---

I think knowing whether or not the count was occurring at the time of the 
pushdown would solve this. So aggregate pushdown is probably the cleanest 
solution. 

We have a similar problem to Spark, we can handle the pushdown efficently if we 
know it's a count that can completely be handled by our filters but not if it 
isn't. Unfortunately we can see if we can satisfy all the filters when 
"unhandled filters" is called but don't know if the plan requires any 
additional columns. So really we would be ok if we just got the required 
"output" at that time. If we satisfy all the predicates and we know the output 
is empty we could handle the count pushdown. Having a specific Aggregate 
pushdown is probably cleaner though.


was (Author: rspitzer):
I think knowing whether or not the count was occurring at the time of the 
pushdown would solve this. So aggregate pushdown is probably the cleanest 
solution. 

We have a similar problem to Spark, we can handle the pushdown efficently if we 
know it's a count that completely be handled by our filters but not if it 
isn't. Unfortunately we can see if we can satisfy all the filters when 
"unhandled filters" is called but don't know if the plan requires any 
additional columns. So really we would be ok if we just got the required 
"output" at that time. If we satisfy all the predicates and we know the output 
is empty we could handle the count pushdown. Having a specific Aggregate 
pushdown is probably cleaner though.

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Reynold Xin
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: SPIP, releasenotes
> Attachments: SPIP Data Source API V2.pdf
>
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15689) Data source API v2

2017-11-01 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234899#comment-16234899
 ] 

Russell Spitzer commented on SPARK-15689:
-

I think knowing whether or not the count was occurring at the time of the 
pushdown would solve this. So aggregate pushdown is probably the cleanest 
solution. 

We have a similar problem to Spark, we can handle the pushdown efficently if we 
know it's a count that completely be handled by our filters but not if it 
isn't. Unfortunately we can see if we can satisfy all the filters when 
"unhandled filters" is called but don't know if the plan requires any 
additional columns. So really we would be ok if we just got the required 
"output" at that time. If we satisfy all the predicates and we know the output 
is empty we could handle the count pushdown. Having a specific Aggregate 
pushdown is probably cleaner though.

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Reynold Xin
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: SPIP, releasenotes
> Attachments: SPIP Data Source API V2.pdf
>
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15689) Data source API v2

2017-11-01 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234820#comment-16234820
 ] 

Russell Spitzer commented on SPARK-15689:
-

It does not, we can tell that a count (or similar request) occurs by seeing 
that no columns are requested from the underlying source. It the current api 
you can see this at the call to "buildScan" but unhandledFilters requires 
knowing the filtering without specifying the columns required.

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Reynold Xin
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: SPIP, releasenotes
> Attachments: SPIP Data Source API V2.pdf
>
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15689) Data source API v2

2017-11-01 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234640#comment-16234640
 ] 

Russell Spitzer edited comment on SPARK-15689 at 11/1/17 7:58 PM:
--

Something I just noticed, it may be helpful to also pass in "required columns" 
to the supportPushdownFilter. This could enable systems which can quickly 
estimate counts (but not actually materialize records) to respond to certain 
filters.

In my example system, I can pick out a small number of records past on an index 
very quickly, but as the number of records as proportion of the data increases 
the usefulness of using the pushdown decreases and eventually becomes a 
hinderance on performance. With counts in particular it is almost always 
beneficial to use the index since no rows are returned but I cannot tell if a 
count is being preformed from the base supportsPushdown add in since I cannot 
tell which columns are being requested. 


was (Author: rspitzer):
Something I just noticed, it may be helpful to also pass in "required columns" 
to the supportPushdownFilter. This could enable systems which can quickly 
estimate counts (but not actually materialize records) to respond to certain 
filters.

In my example system, I can pick out a small number of records past on an index 
very quickly, but as the number of records as proportion of the data increases 
the usefulness of using the pushdown decreases and eventually becomes a 
hinderance on performance. With counts in particular it is almost always 
beneficial to use the index since no rows are returned but I cannot tell if a 
count is being preformed from the base supportsPushdown add in. 

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Reynold Xin
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: SPIP, releasenotes
> Attachments: SPIP Data Source API V2.pdf
>
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15689) Data source API v2

2017-11-01 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16234640#comment-16234640
 ] 

Russell Spitzer commented on SPARK-15689:
-

Something I just noticed, it may be helpful to also pass in "required columns" 
to the supportPushdownFilter. This could enable systems which can quickly 
estimate counts (but not actually materialize records) to respond to certain 
filters.

In my example system, I can pick out a small number of records past on an index 
very quickly, but as the number of records as proportion of the data increases 
the usefulness of using the pushdown decreases and eventually becomes a 
hinderance on performance. With counts in particular it is almost always 
beneficial to use the index since no rows are returned but I cannot tell if a 
count is being preformed from the base supportsPushdown add in. 

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Reynold Xin
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: SPIP, releasenotes
> Attachments: SPIP Data Source API V2.pdf
>
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-24 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217666#comment-16217666
 ] 

Russell Spitzer commented on SPARK-22316:
-

[~hvanhovell] This was the ticket I told you about :)

> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).age) AS person#196, StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true), true, 0, 0) AS 
> ReduceAggregator(Customer)#346]
>+- AppendColumns , class Customer, 
> [StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
> [input[0, int, false] AS value#338]
>   +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct struct>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211849#comment-16211849
 ] 

Russell Spitzer edited comment on SPARK-22316 at 10/19/17 10:21 PM:


Nope, it's not the parens ... I'm allowed to have columns with parens in this 
example
{code}
scala> val ds = spark.createDataset(1 to 10).toDF("ColumnWith(Parens)")
ds: org.apache.spark.sql.DataFrame = [ColumnWith(Parens): int]
scala> ds.filter(ds(ds.columns(0)) < 5) . show
{code}




was (Author: rspitzer):
Nope I'm allowed to have columns with parens in this example
{code}
scala> val ds = spark.createDataset(1 to 10).toDF("ColumnWith(Parens)")
ds: org.apache.spark.sql.DataFrame = [ColumnWith(Parens): int]
scala> ds.filter(ds(ds.columns(0)) < 5) . show
{code}



> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).age) AS person#196, StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true), true, 0, 0) AS 
> ReduceAggregator(Customer)#346]
>+- AppendColumns , class Customer, 
> [StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
> [input[0, int, false] AS value#338]
>   +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct struct>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211849#comment-16211849
 ] 

Russell Spitzer commented on SPARK-22316:
-

Nope I'm allowed to have columns with parens in this example
{code}
scala> val ds = spark.createDataset(1 to 10).toDF("ColumnWith(Parens)")
ds: org.apache.spark.sql.DataFrame = [ColumnWith(Parens): int]
scala> ds.filter(ds(ds.columns(0)) < 5) . show
{code}



> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).age) AS person#196, StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true), true, 0, 0) AS 
> ReduceAggregator(Customer)#346]
>+- AppendColumns , class Customer, 
> [StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
> [input[0, int, false] AS value#338]
>   +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct struct>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-22316:

Description: 
Given a dataset which has been run through reduceGroups like this
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85
val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
{code}

We end up with a Dataset with the schema

{code}
 org.apache.spark.sql.types.StructType = 
StructType(
  StructField(value,IntegerType,false), 
  StructField(ReduceAggregator(Customer),
StructType(StructField(id,IntegerType,false),
StructField(person,
  StructType(StructField(name,StringType,true),
  StructField(age,IntegerType,false))
   ,true))
,true))
{code}

The column names are 
{code}
Array(value, ReduceAggregator(Customer))
{code}

But you cannot select the "ReduceAggregatorColumn"
{code}
grouped.select(grouped.columns(1))
org.apache.spark.sql.AnalysisException: cannot resolve 
'`ReduceAggregator(Customer)`' given input columns: [value, 
ReduceAggregator(Customer)];;
'Project ['ReduceAggregator(Customer)]
+- Aggregate [value#338], [value#338, 
reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
Some(newInstance(class Customer)), Some(class Customer), 
Some(StructType(StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS 
value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null 
else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).id AS id#195, person, if 
(isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person)) 
null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person)) null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).age) AS person#196, StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true), true, 0, 0) AS 
ReduceAggregator(Customer)#346]
   +- AppendColumns , class Customer, 
[StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
[input[0, int, false] AS value#338]
  +- LocalRelation [id#197, person#198]
{code}

You can work around this by using "toDF" to rename the column

{code}
scala> grouped.toDF("key", "reduced").select("reduced")
res55: org.apache.spark.sql.DataFrame = [reduced: struct>]
{code}

I think that all invocations of 
{code}
ds.select(ds.columns(i))
{code}
For all valid i < columns size should work.

  was:
Given a dataset which has been run through reduceGroups like this
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85)))
val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
{code}

We end up with a Dataset with the schema

{code}
 org.apache.spark.sql.types.StructType = 
StructType(
  StructField(value,IntegerType,false), 
  StructField(ReduceAggregator(Customer),
StructType(StructField(id,IntegerType,false),
StructField(person,
  StructType(StructField(name,StringType,true),
  StructField(age,IntegerType,false))
   ,true))
,true))
{code}

The column names are 
{code}
Array(value, ReduceAggregator(Customer))
{code}

But you cannot select the "ReduceAggregatorColumn"
{code}
grouped.select(grouped.columns(1))
org.apache.spark.sql.AnalysisException: cannot resolve 
'`ReduceAggregator(Customer)`' given input columns: [value, 
ReduceAggregator(Customer)];;
'Project ['ReduceAggregator(Customer)]
+- Aggregate [value#338], [value#338, 
reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
Some(newInstance(class Customer)), Some(class Customer), 
Some(StructType(StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS 
value#340, if ((isnull(input[0, scala.Tuple2, 

[jira] [Updated] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-22316:

Description: 
Given a dataset which has been run through reduceGroups like this
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85)))
val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
{code}

We end up with a Dataset with the schema

{code}
 org.apache.spark.sql.types.StructType = 
StructType(
  StructField(value,IntegerType,false), 
  StructField(ReduceAggregator(Customer),
StructType(StructField(id,IntegerType,false),
StructField(person,
  StructType(StructField(name,StringType,true),
  StructField(age,IntegerType,false))
   ,true))
,true))
{code}

The column names are 
{code}
Array(value, ReduceAggregator(Customer))
{code}

But you cannot select the "ReduceAggregatorColumn"
{code}
grouped.select(grouped.columns(1))
org.apache.spark.sql.AnalysisException: cannot resolve 
'`ReduceAggregator(Customer)`' given input columns: [value, 
ReduceAggregator(Customer)];;
'Project ['ReduceAggregator(Customer)]
+- Aggregate [value#338], [value#338, 
reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
Some(newInstance(class Customer)), Some(class Customer), 
Some(StructType(StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS 
value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null 
else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).id AS id#195, person, if 
(isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person)) 
null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person)) null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).age) AS person#196, StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true), true, 0, 0) AS 
ReduceAggregator(Customer)#346]
   +- AppendColumns , class Customer, 
[StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
[input[0, int, false] AS value#338]
  +- LocalRelation [id#197, person#198]
{code}

You can work around this by using "toDF" to rename the column

{code}
scala> grouped.toDF("key", "reduced").select("reduced")
res55: org.apache.spark.sql.DataFrame = [reduced: struct>]
{code}

I think that all invocations of 
{code}
ds.select(ds.columns(i))
{code}
For all valid i < columns size should work.

  was:
Given a dataset which has been run through reduceGroups like this
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
{code}

We end up with a Dataset with the schema

{code}
 org.apache.spark.sql.types.StructType = 
StructType(
  StructField(value,IntegerType,false), 
  StructField(ReduceAggregator(Customer),
StructType(StructField(id,IntegerType,false),
StructField(person,
  StructType(StructField(name,StringType,true),
  StructField(age,IntegerType,false))
   ,true))
,true))
{code}

The column names are 
{code}
Array(value, ReduceAggregator(Customer))
{code}

But you cannot select the "ReduceAggregatorColumn"
{code}
grouped.select(grouped.columns(1))
org.apache.spark.sql.AnalysisException: cannot resolve 
'`ReduceAggregator(Customer)`' given input columns: [value, 
ReduceAggregator(Customer)];;
'Project ['ReduceAggregator(Customer)]
+- Aggregate [value#338], [value#338, 
reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
Some(newInstance(class Customer)), Some(class Customer), 
Some(StructType(StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS 
value#340, if ((isnull(input[0, 

[jira] [Commented] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211846#comment-16211846
 ] 

Russell Spitzer commented on SPARK-22316:
-

You are right, i meant to have createDataset up there. I"ll modify the example.

This shouldn't be Dataset specific since the underlying issue isn't really 
dependent on the encoders, it's just that the name that is automatically made 
cannot be used to select the column. After all a Dataframe is just a 
Dataset[Row] :). 

I haven't really looked into this but i'm guessing it's the "(" characters in 
the auto-generated column name.

Here is an example with Dataframes for good measure
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85
val grouped = ds.groupByKey(c => c.getInt(0)).reduceGroups( (x,y) => x )
grouped(grouped.columns(1))
/**org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(org.apache.spark.sql.Row)" among (value, 
ReduceAggregator(org.apache.spark.sql.Row));
**/
{code}






> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).age) AS person#196, StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true), true, 0, 0) AS 
> ReduceAggregator(Customer)#346]
>+- AppendColumns , class Customer, 
> [StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
> [input[0, int, false] AS value#338]
>   +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct struct>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all 

[jira] [Comment Edited] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211528#comment-16211528
 ] 

Russell Spitzer edited comment on SPARK-22316 at 10/19/17 6:49 PM:
---

I can imagine many reasons I might want to access a column in a DataFrame. 

Lets say I want to add a new column based on the property of my reduced group
{code}
scala> grouped.withColumn("newId", grouped(grouped.columns(1)))
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

Or filter based on the results
{code}
scala> grouped.filter(grouped(grouped.columns(1)) < 5)
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

or to use the literal select function to expand the Struct which gives a 
slightly different 
error
{code}
scala> grouped.select("ReduceAggregator(Customer).*")
org.apache.spark.sql.AnalysisException: cannot resolve 
'ReduceAggregator(Customer).*' give input columns 'id, person, value';
{code}

I should when I say "select" I mean actually access the column by its name for 
any purpose.





was (Author: rspitzer):
I can imagine many reasons i might want to access a column in a dataframe. 

Lets say I want to add a new column based on the property of my reduced group
{code}
scala> grouped.withColumn("newId", grouped(grouped.columns(1)))
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

Or filter based on the results
{code}
scala> grouped.filter(grouped(grouped.columns(1)) < 5)
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

or to use the literal select function to expand the Struct which gives a 
slightly different 
error
{code}
scala> grouped.select("ReduceAggregator(Customer).*")
org.apache.spark.sql.AnalysisException: cannot resolve 
'ReduceAggregator(Customer).*' give input columns 'id, person, value';
{code}

I should when i say "select" I mean actually access the column by it's name for 
any purpose.




> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> 

[jira] [Comment Edited] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211528#comment-16211528
 ] 

Russell Spitzer edited comment on SPARK-22316 at 10/19/17 6:46 PM:
---

I can imagine many reasons i might want to access a column in a dataframe. 

Lets say I want to add a new column based on the property of my reduced group
{code}
scala> grouped.withColumn("newId", grouped(grouped.columns(1)))
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

Or filter based on the results
{code}
scala> grouped.filter(grouped(grouped.columns(1)) < 5)
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

or to use the literal select function to expand the Struct which gives a 
slightly different 
error
{code}
scala> grouped.select("ReduceAggregator(Customer).*")
org.apache.spark.sql.AnalysisException: cannot resolve 
'ReduceAggregator(Customer).*' give input columns 'id, person, value';
{code}

I should when i say "select" I mean actually access the column by it's name for 
any purpose.





was (Author: rspitzer):
I can imagine many reasons i might want to access a column in a dataframe. 

Lets say I want to add a new column based on the property of my reduced group
{code}
scala> grouped.withColumn("newId", grouped(grouped.columns(1)))
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

Or filter based on the results
{code}
scala> grouped.filter(grouped(grouped.columns(1)) < 5)
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

I should when i say "select" I mean actually access the column by it's name for 
any purpose.




> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 

[jira] [Commented] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211528#comment-16211528
 ] 

Russell Spitzer commented on SPARK-22316:
-

I can imagine many reasons i might want to access a column in a dataframe. 

Lets say I want to add a new column based on the property of my reduced group
{code}
scala> grouped.withColumn("newId", grouped(grouped.columns(1)))
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

Or filter based on the results
{code}
scala> grouped.filter(grouped(grouped.columns(1)) < 5)
org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"ReduceAggregator(Customer)" among (value, ReduceAggregator(Customer));
{code}

I should when i say "select" I mean actually access the column by it's name for 
any purpose.




> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).age) AS person#196, StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true), true, 0, 0) AS 
> ReduceAggregator(Customer)#346]
>+- AppendColumns , class Customer, 
> [StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
> [input[0, int, false] AS value#338]
>   +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct struct>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org

[jira] [Updated] (SPARK-22316) Cannot Select ReducedAggregator Column

2017-10-19 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-22316:

Summary: Cannot Select ReducedAggregator Column  (was: Cannot Select 
ReducedAggreagtor Column)

> Cannot Select ReducedAggregator Column
> --
>
> Key: SPARK-22316
> URL: https://issues.apache.org/jira/browse/SPARK-22316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Russell Spitzer
>Priority: Minor
>
> Given a dataset which has been run through reduceGroups like this
> {code}
> case class Person(name: String, age: Int)
> case class Customer(id: Int, person: Person)
> val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
> val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
> {code}
> We end up with a Dataset with the schema
> {code}
>  org.apache.spark.sql.types.StructType = 
> StructType(
>   StructField(value,IntegerType,false), 
>   StructField(ReduceAggregator(Customer),
> StructType(StructField(id,IntegerType,false),
> StructField(person,
>   StructType(StructField(name,StringType,true),
>   StructField(age,IntegerType,false))
>,true))
> ,true))
> {code}
> The column names are 
> {code}
> Array(value, ReduceAggregator(Customer))
> {code}
> But you cannot select the "ReduceAggregatorColumn"
> {code}
> grouped.select(grouped.columns(1))
> org.apache.spark.sql.AnalysisException: cannot resolve 
> '`ReduceAggregator(Customer)`' given input columns: [value, 
> ReduceAggregator(Customer)];;
> 'Project ['ReduceAggregator(Customer)]
> +- Aggregate [value#338], [value#338, 
> reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
> Some(newInstance(class Customer)), Some(class Customer), 
> Some(StructType(StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 
> AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) 
> null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).id AS id#195, person, if 
> (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
> true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
> scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
> id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person)) null else named_struct(name, staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).name, true), age, 
> assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
> true])).person).age) AS person#196, StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true), true, 0, 0) AS 
> ReduceAggregator(Customer)#346]
>+- AppendColumns , class Customer, 
> [StructField(id,IntegerType,false), 
> StructField(person,StructType(StructField(name,StringType,true), 
> StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
> [input[0, int, false] AS value#338]
>   +- LocalRelation [id#197, person#198]
> {code}
> You can work around this by using "toDF" to rename the column
> {code}
> scala> grouped.toDF("key", "reduced").select("reduced")
> res55: org.apache.spark.sql.DataFrame = [reduced: struct struct>]
> {code}
> I think that all invocations of 
> {code}
> ds.select(ds.columns(i))
> {code}
> For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22316) Cannot Select ReducedAggreagtor Column

2017-10-19 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-22316:
---

 Summary: Cannot Select ReducedAggreagtor Column
 Key: SPARK-22316
 URL: https://issues.apache.org/jira/browse/SPARK-22316
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Russell Spitzer
Priority: Minor


Given a dataset which has been run through reduceGroups like this
{code}
case class Person(name: String, age: Int)
case class Customer(id: Int, person: Person)
val ds = spark.createDataFrame(Seq(Customer(1,Person("russ", 85)))
val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
{code}

We end up with a Dataset with the schema

{code}
 org.apache.spark.sql.types.StructType = 
StructType(
  StructField(value,IntegerType,false), 
  StructField(ReduceAggregator(Customer),
StructType(StructField(id,IntegerType,false),
StructField(person,
  StructType(StructField(name,StringType,true),
  StructField(age,IntegerType,false))
   ,true))
,true))
{code}

The column names are 
{code}
Array(value, ReduceAggregator(Customer))
{code}

But you cannot select the "ReduceAggregatorColumn"
{code}
grouped.select(grouped.columns(1))
org.apache.spark.sql.AnalysisException: cannot resolve 
'`ReduceAggregator(Customer)`' given input columns: [value, 
ReduceAggregator(Customer)];;
'Project ['ReduceAggregator(Customer)]
+- Aggregate [value#338], [value#338, 
reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, 
Some(newInstance(class Customer)), Some(class Customer), 
Some(StructType(StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS 
value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null 
else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).id AS id#195, person, if 
(isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person)) 
null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]._2)).person).age) AS person#196) AS _2#341, newInstance(class 
scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS 
id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person)) null else named_struct(name, staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).name, true), age, 
assertnotnull(assertnotnull(assertnotnull(input[0, Customer, 
true])).person).age) AS person#196, StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true), true, 0, 0) AS 
ReduceAggregator(Customer)#346]
   +- AppendColumns , class Customer, 
[StructField(id,IntegerType,false), 
StructField(person,StructType(StructField(name,StringType,true), 
StructField(age,IntegerType,false)),true)], newInstance(class Customer), 
[input[0, int, false] AS value#338]
  +- LocalRelation [id#197, person#198]
{code}

You can work around this by using "toDF" to rename the column

{code}
scala> grouped.toDF("key", "reduced").select("reduced")
res55: org.apache.spark.sql.DataFrame = [reduced: struct>]
{code}

I think that all invocations of 
{code}
ds.select(ds.columns(i))
{code}
For all valid i < columns size should work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15689) Data source API v2

2017-08-17 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130355#comment-16130355
 ] 

Russell Spitzer commented on SPARK-15689:
-

Thanks [~cloud_fan] for posting the design doc it was a great read and I like a 
lot of the direction this is going in. It would helpful if we could have access 
to the doc as a google doc or some other editable/comment-able form though to 
encourage discussion.

I left some comments on the prototype but one thing I think could be a great 
addition would be a joinInterface. I ended up writing up one of these 
specifically for Cassandra and had to do a lot of plumbing to get it to fit 
into the rest of the Catalyst ecosystem so I think this would be a great time 
to plan ahead in Spark design. 

The join interface would look a lot like a combination of the read and write 
apis, given a row input and a set of expressions the relationship should return 
rows that match those expressions OR fallback to just being a read relationship 
if none of the expressions can be satisfied by the join (leaving all the 
expressions to be evaluated in spark). 

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>  Labels: releasenotes
> Attachments: SPIP Data Source API V2.pdf
>
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15689) Data source API v2

2017-06-16 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16052321#comment-16052321
 ] 

Russell Spitzer commented on SPARK-15689:
-

I've been trying to work with making Catalyst Cassandra partitioning aware. 
There seem to be two major blocks on this.

The first is that DataSourceScanExec is unable to learn what the underlying 
partitioning should be from the BaseRelation it comes from. I'm currently able 
to get around this by using the DataSourceStrategy plan and then transforming 
the resultant DataSourceScanExec.

The second is that the Partitioning trait is sealed. I want to define a new 
partitioning which is Clustered but is not hashed based on certain columns. It 
would look almost identical to the HashPartitioning class except the
expression which returns a valid PartitionID given expressions would be 
different. 

So for V2 I would really like the ability to specify the physical partitioning 
and as well be able to define new custom partitioning. 

> Data source API v2
> --
>
> Key: SPARK-15689
> URL: https://issues.apache.org/jira/browse/SPARK-15689
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
>  Labels: releasenotes
>
> This ticket tracks progress in creating the v2 of data source API. This new 
> API should focus on:
> 1. Have a small surface so it is easy to freeze and maintain compatibility 
> for a long time. Ideally, this API should survive architectural rewrites and 
> user-facing API revamps of Spark.
> 2. Have a well-defined column batch interface for high performance. 
> Convenience methods should exist to convert row-oriented formats into column 
> batches for data source developers.
> 3. Still support filter push down, similar to the existing API.
> 4. Nice-to-have: support additional common operators, including limit and 
> sampling.
> Note that both 1 and 2 are problems that the current data source API (v1) 
> suffers. The current data source API has a wide surface with dependency on 
> DataFrame/SQLContext, making the data source API compatibility depending on 
> the upper level API. The current data source API is also only row oriented 
> and has to go through an expensive external data type conversion to internal 
> data type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18851) DataSet Limit into Aggregate Results in NPE in Codegen

2016-12-13 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer resolved SPARK-18851.
-
Resolution: Duplicate

> DataSet Limit into Aggregate Results in NPE in Codegen
> --
>
> Key: SPARK-18851
> URL: https://issues.apache.org/jira/browse/SPARK-18851
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: regresion
>
> Performing a limit and then an aggregate seems to generate NPE vulnerable 
> code.
> Simple example here
> {code}
> case class ABCD ( a: Int, b:Int , c:Int, d: Option[Int])
> val ds = sc.parallelize(1 to 50).map( i => ABCD(i, i, i, if (i%2==0) None 
> else Some(i))).toDS
> ds.limit(5).distinct.show
> {code}
> {code}
> 16/12/13 14:02:37 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 
> 597)
> java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/12/13 14:02:37 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 597, 
> localhost): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/12/13 14:02:37 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; 
> aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 
> (TID 597, localhost): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> 

[jira] [Updated] (SPARK-18851) DataSet Limit into Aggregate Results in NPE in Codegen

2016-12-13 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-18851:

Labels: regresion  (was: )

> DataSet Limit into Aggregate Results in NPE in Codegen
> --
>
> Key: SPARK-18851
> URL: https://issues.apache.org/jira/browse/SPARK-18851
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: regresion
>
> Performing a limit and then an aggregate seems to generate NPE vulnerable 
> code.
> Simple example here
> {code}
> case class ABCD ( a: Int, b:Int , c:Int, d: Option[Int])
> val ds = sc.parallelize(1 to 50).map( i => ABCD(i, i, i, if (i%2==0) None 
> else Some(i))).toDS
> ds.limit(5).distinct.show
> {code}
> {code}
> 16/12/13 14:02:37 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 
> 597)
> java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/12/13 14:02:37 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 597, 
> localhost): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/12/13 14:02:37 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; 
> aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 
> (TID 597, localhost): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> 

[jira] [Updated] (SPARK-18851) DataSet Limit into Aggregate Results in NPE in Codegen

2016-12-13 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-18851:

Summary: DataSet Limit into Aggregate Results in NPE in Codegen  (was: 
DataSet limit.distinct Results in NPE in Codegen)

> DataSet Limit into Aggregate Results in NPE in Codegen
> --
>
> Key: SPARK-18851
> URL: https://issues.apache.org/jira/browse/SPARK-18851
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Russell Spitzer
>Priority: Critical
>
> Performing a limit and then an aggregate seems to generate NPE vulnerable 
> code.
> Simple example here
> {code}
> case class ABCD ( a: Int, b:Int , c:Int, d: Option[Int])
> val ds = sc.parallelize(1 to 50).map( i => ABCD(i, i, i, if (i%2==0) None 
> else Some(i))).toDS
> ds.limit(5).distinct.show
> {code}
> {code}
> 16/12/13 14:02:37 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 
> 597)
> java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/12/13 14:02:37 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 597, 
> localhost): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/12/13 14:02:37 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; 
> aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 
> (TID 597, localhost): java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  

[jira] [Created] (SPARK-18851) DataSet limit.distinct Results in NPE in Codegen

2016-12-13 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-18851:
---

 Summary: DataSet limit.distinct Results in NPE in Codegen
 Key: SPARK-18851
 URL: https://issues.apache.org/jira/browse/SPARK-18851
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.2
Reporter: Russell Spitzer
Priority: Critical


Performing a limit and then an aggregate seems to generate NPE vulnerable code.

Simple example here

{code}
case class ABCD ( a: Int, b:Int , c:Int, d: Option[Int])
val ds = sc.parallelize(1 to 50).map( i => ABCD(i, i, i, if (i%2==0) None else 
Some(i))).toDS
ds.limit(5).distinct.show
{code}

{code}
16/12/13 14:02:37 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 597)
java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/12/13 14:02:37 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 597, 
localhost): java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

16/12/13 14:02:37 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; 
aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 
(TID 597, localhost): java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 

[jira] [Comment Edited] (SPARK-17845) Improve window function frame boundary API in DataFrame

2016-10-09 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15561305#comment-15561305
 ] 

Russell Spitzer edited comment on SPARK-17845 at 10/10/16 5:14 AM:
---

FWIW, I Find (2) much more readable but I don't come from an SQL background.


was (Author: rspitzer):
FWIW, I Find (2) much more readable

> Improve window function frame boundary API in DataFrame
> ---
>
> Key: SPARK-17845
> URL: https://issues.apache.org/jira/browse/SPARK-17845
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> ANSI SQL uses the following to specify the frame boundaries for window 
> functions:
> {code}
> ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> {code}
> In Spark's DataFrame API, we use integer values to indicate relative position:
> - 0 means "CURRENT ROW"
> - -1 means "1 PRECEDING"
> - Long.MinValue means "UNBOUNDED PRECEDING"
> - Long.MaxValue to indicate "UNBOUNDED FOLLOWING"
> {code}
> // ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> Window.rowsBetween(-3, +3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> Window.rowsBetween(Long.MinValue, -3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> Window.rowsBetween(Long.MinValue, 0)
> // ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> Window.rowsBetween(0, Long.MaxValue)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> Window.rowsBetween(Long.MinValue, Long.MaxValue)
> {code}
> I think using numeric values to indicate relative positions is actually a 
> good idea, but the reliance on Long.MinValue and Long.MaxValue to indicate 
> unbounded ends is pretty confusing:
> 1. The API is not self-evident. There is no way for a new user to figure out 
> how to indicate an unbounded frame by looking at just the API. The user has 
> to read the doc to figure this out.
> 2. It is weird Long.MinValue or Long.MaxValue has some special meaning.
> 3. Different languages have different min/max values, e.g. in Python we use 
> -sys.maxsize and +sys.maxsize.
> To make this API less confusing, we have a few options:
> Option 1. Add the following (additional) methods:
> {code}
> // ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> Window.rowsBetween(-3, +3)  // this one exists already
> // ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> Window.rowsBetweenUnboundedPrecedingAnd(-3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> Window.rowsBetweenUnboundedPrecedingAndCurrentRow()
> // ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> Window.rowsBetweenCurrentRowAndUnboundedFollowing()
> // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> Window.rowsBetweenUnboundedPrecedingAndUnboundedFollowing()
> {code}
> This is obviously very verbose, but is very similar to how these functions 
> are done in SQL, and is perhaps the most obvious to end users, especially if 
> they come from SQL background.
> Option 2. Decouple the specification for frame begin and frame end into two 
> functions. Assume the boundary is unlimited unless specified.
> {code}
> // ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> Window.rowsFrom(-3).rowsTo(3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> Window.rowsTo(-3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> Window.rowsToCurrent() or Window.rowsTo(0)
> // ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> Window.rowsFromCurrent() or Window.rowsFrom(0)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> // no need to specify
> {code}
> If we go with option 2, we should throw exceptions if users specify multiple 
> from's or to's. A variant of option 2 is to require explicitly specification 
> of begin/end even in the case of unbounded boundary, e.g.:
> {code}
> Window.rowsFromBeginning().rowsTo(-3)
> or
> Window.rowsFromUnboundedPreceding().rowsTo(-3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17845) Improve window function frame boundary API in DataFrame

2016-10-09 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15561305#comment-15561305
 ] 

Russell Spitzer commented on SPARK-17845:
-

FWIW, I Find (2) much more readable

> Improve window function frame boundary API in DataFrame
> ---
>
> Key: SPARK-17845
> URL: https://issues.apache.org/jira/browse/SPARK-17845
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Reynold Xin
>Assignee: Reynold Xin
>
> ANSI SQL uses the following to specify the frame boundaries for window 
> functions:
> {code}
> ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> {code}
> In Spark's DataFrame API, we use integer values to indicate relative position:
> - 0 means "CURRENT ROW"
> - -1 means "1 PRECEDING"
> - Long.MinValue means "UNBOUNDED PRECEDING"
> - Long.MaxValue to indicate "UNBOUNDED FOLLOWING"
> {code}
> // ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> Window.rowsBetween(-3, +3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> Window.rowsBetween(Long.MinValue, -3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> Window.rowsBetween(Long.MinValue, 0)
> // ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> Window.rowsBetween(0, Long.MaxValue)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> Window.rowsBetween(Long.MinValue, Long.MaxValue)
> {code}
> I think using numeric values to indicate relative positions is actually a 
> good idea, but the reliance on Long.MinValue and Long.MaxValue to indicate 
> unbounded ends is pretty confusing:
> 1. The API is not self-evident. There is no way for a new user to figure out 
> how to indicate an unbounded frame by looking at just the API. The user has 
> to read the doc to figure this out.
> 2. It is weird Long.MinValue or Long.MaxValue has some special meaning.
> 3. Different languages have different min/max values, e.g. in Python we use 
> -sys.maxsize and +sys.maxsize.
> To make this API less confusing, we have a few options:
> Option 1. Add the following (additional) methods:
> {code}
> // ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> Window.rowsBetween(-3, +3)  // this one exists already
> // ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> Window.rowsBetweenUnboundedPrecedingAnd(-3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> Window.rowsBetweenUnboundedPrecedingAndCurrentRow()
> // ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> Window.rowsBetweenCurrentRowAndUnboundedFollowing()
> // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> Window.rowsBetweenUnboundedPrecedingAndUnboundedFollowing()
> {code}
> This is obviously very verbose, but is very similar to how these functions 
> are done in SQL, and is perhaps the most obvious to end users, especially if 
> they come from SQL background.
> Option 2. Decouple the specification for frame begin and frame end into two 
> functions. Assume the boundary is unlimited unless specified.
> {code}
> // ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
> Window.rowsFrom(-3).rowsTo(3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND 3 PRECEDING
> Window.rowsTo(-3)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
> Window.rowsToCurrent() or Window.rowsTo(0)
> // ROWS BETWEEN CURRENT ROW AND UNBOUNDED PRECEDING
> Window.rowsFromCurrent() or Window.rowsFrom(0)
> // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
> // no need to specify
> {code}
> If we go with option 2, we should throw exceptions if users specify multiple 
> from's or to's. A variant of option 2 is to require explicitly specification 
> of begin/end even in the case of unbounded boundary, e.g.:
> {code}
> Window.rowsFromBeginning().rowsTo(-3)
> or
> Window.rowsFromUnboundedPreceding().rowsTo(-3)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10501) support UUID as an atomic type

2016-10-08 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15559271#comment-15559271
 ] 

Russell Spitzer commented on SPARK-10501:
-

It's not that we need it as a unique identifier. It's already a datatype in the 
Cassandra database but there is no direct translation to a spark sql type so a 
conversion to string must be done. In addition TimeUUIDs require a custom 
non-bytewise comparator so a greater than or less than lexical comparison of 
them is always incorrect. 

https://datastax-oss.atlassian.net/browse/SPARKC-405

> support UUID as an atomic type
> --
>
> Key: SPARK-10501
> URL: https://issues.apache.org/jira/browse/SPARK-10501
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Jon Haddad
>Priority: Minor
>
> It's pretty common to use UUIDs instead of integers in order to avoid 
> distributed counters.  
> I've added this, which at least lets me load dataframes that use UUIDs that I 
> can cast to strings:
> {code}
> class UUIDType(AtomicType):
> pass
> _type_mappings[UUID] = UUIDType
> _atomic_types.append(UUIDType)
> {code}
> But if I try to do anything else with the UUIDs, like this:
> {code}
> ratings.select("userid").distinct().collect()
> {code}
> I get this pile of fun: 
> {code}
> scala.MatchError: UUIDType (of class 
> org.apache.spark.sql.cassandra.types.UUIDType$)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-28 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531265#comment-15531265
 ] 

Russell Spitzer commented on SPARK-17673:
-

Looks good on my end

{code}
scala> min2.union(min1).show()
+---+---+
| id|min|
+---+---+
|  A|  7|
|  A|  1|
+---+---+


scala> min1.union(min2).show()
+---+---+
| id|min|
+---+---+
|  A|  1|
|  A|  7|
+---+---+
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Assignee: Eric Liang
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.0.1, 2.1.0
>
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525101#comment-15525101
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 5:17 AM:
--

{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

So the relations do look exactly the same (even though they are not) in the 
optimized plan


was (Author: rspitzer):
{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
> 

[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525101#comment-15525101
 ] 

Russell Spitzer commented on SPARK-17673:
-

{code}== Parsed Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Analyzed Logical Plan ==
id: string, min: int
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102, col2#103]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col1#102, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Optimized Logical Plan ==
Union
:- Aggregate [id#101], [id#101, min(col1#102) AS min#128]
:  +- Project [id#101, col1#102]
: +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118
+- Aggregate [id#101], [id#101, min(col2#103) AS min#137]
   +- Project [id#101, col2#103]
  +- Relation[id#101,col1#102,col2#103] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118

== Physical Plan ==
Union
:- *HashAggregate(keys=[id#101], functions=[min(col1#102)], output=[id#101, 
min#128])
:  +- Exchange hashpartitioning(id#101, 200)
: +- *HashAggregate(keys=[id#101], functions=[partial_min(col1#102)], 
output=[id#101, min#182])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@486a9118 
[id#101,col1#102]
+- *HashAggregate(keys=[id#101], functions=[min(col2#103)], output=[id#101, 
min#137])
   +- ReusedExchange [id#101, min#190], Exchange hashpartitioning(id#101, 200)
{code}

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To 

[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525096#comment-15525096
 ] 

Russell Spitzer commented on SPARK-17673:
-

Ah yeah there would definitely be different pruning in both "source"s  Getting 
the optimized plan now

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525007#comment-15525007
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 4:12 AM:
--

Looking at this plan 
{code}```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```
{code}
I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?


was (Author: rspitzer):
Looking at this plan 
```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```

I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525007#comment-15525007
 ] 

Russell Spitzer commented on SPARK-17673:
-

Looking at this plan 
```
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)```

I see it reuses the hash aggregate from the partial min on col1 even though the 
hash aggregate that runs it does min col2. Am I reading that right? The column 
names don't even match so I'm confused how that gets through?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524999#comment-15524999
 ] 

Russell Spitzer commented on SPARK-17673:
-

We shouldn't be ... The only thing we cache are underlying database connections 
and queries which shouldn't factor into this I would think :/

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524848#comment-15524848
 ] 

Russell Spitzer commented on SPARK-17673:
-

I couldn't get this to happen without C*, hopefully tomorrow I can get some 
guidance tomorrow :/ It could be a hashcode / equals thing but we don't 
override those in the base class. Also i'm a little confused because this 
should be the same "grouping" operation on the RDD just with a different 
aggregate. I don't know enough about the ReusedExchange to know when it's 
applied and why.

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 1:39 AM:
--

Well in this case they are equal correct? We are using the same Dataframe with 
two different aggregation steps.

The Parquet example doesn't end up using the reusedExchange. It does the same 
plan as the parallelized one

{code}
PDF
== Physical Plan ==
Union
:- *HashAggregate(keys=[id#112], functions=[min(col1#113)])
:  +- Exchange hashpartitioning(id#112, 200)
: +- *HashAggregate(keys=[id#112], functions=[partial_min(col1#113)])
:+- *BatchedScan parquet [id#112,col1#113] Format: ParquetFormat, 
InputPaths: 
file:/Users/russellspitzer/repos/spark-cassandra-connector/ꟾ뫳㼎麡䰨틖㇗ཨᎪ贬, 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- *HashAggregate(keys=[id#112], functions=[min(col2#114)])
   +- Exchange hashpartitioning(id#112, 200)
  +- *HashAggregate(keys=[id#112], functions=[partial_min(col2#114)])
 +- *BatchedScan parquet [id#112,col2#114] Format: ParquetFormat, 
InputPaths: 
file:/Users/russellspitzer/repos/spark-cassandra-connector/ꟾ뫳㼎麡䰨틖㇗ཨᎪ贬, 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}


was (Author: rspitzer):
Well in this case they are equal correct?

The Parquet example doesn't end up using the reusedExchange

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer edited comment on SPARK-17673 at 9/27/16 1:38 AM:
--

Well in this case they are equal correct?

The Parquet example doesn't end up using the reusedExchange


was (Author: rspitzer):
Well in this case they are equal correct?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524734#comment-15524734
 ] 

Russell Spitzer commented on SPARK-17673:
-

Well in this case they are equal correct?

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524722#comment-15524722
 ] 

Russell Spitzer commented on SPARK-17673:
-

Ugh I made a typo in my Parquet Example I don't see it repoing there now. Let 
me run investigate a little more as to why this would affect the C* Source...

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>Priority: Critical
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524641#comment-15524641
 ] 

Russell Spitzer commented on SPARK-17673:
-

I only ran this on 2.0.0 and 2.0.1

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-17673:

Labels: correctness  (was: )

> Reused Exchange Aggregations Produce Incorrect Results
> --
>
> Key: SPARK-17673
> URL: https://issues.apache.org/jira/browse/SPARK-17673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Russell Spitzer
>  Labels: correctness
>
> https://datastax-oss.atlassian.net/browse/SPARKC-429
> Was brought to my attention where the following code produces incorrect 
> results
> {code}
>  val data = List(TestData("A", 1, 7))
> val frame = 
> session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
> frame.createCassandraTable(
>   keySpaceName,
>   table,
>   partitionKeyColumns = Some(Seq("id")))
> frame
>   .write
>   .format("org.apache.spark.sql.cassandra")
>   .mode(SaveMode.Append)
>   .options(Map("table" -> table, "keyspace" -> keySpaceName))
>   .save()
> val loaded = sparkSession.sqlContext
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map("table" -> table, "keyspace" -> ks))
>   .load()
>   .select("id", "col1", "col2")
> val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
> val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
>  min1.union(min2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
>  Should be 
>   | A| 1|
>   | A| 7|
>  */
> {code}
> I looked into the explain pattern and saw 
> {code}
> Union
> :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
> :  +- Exchange hashpartitioning(id#93, 200)
> : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
> :+- *Scan 
> org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 
> [id#93,col1#94]
> +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
>+- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
> {code}
> Which was different than using a parallelized collection as the DF backing. 
> So I tested the same code with a Parquet backed DF and saw the same results.
> {code}
> frame.write.parquet("garbagetest")
> val parquet = sparkSession.read.parquet("garbagetest").select("id", 
> "col1", "col2")
> println("PDF")
> parquetmin1.union(parquetmin2).explain()
> parquetmin1.union(parquetmin2).show()
> /* prints:
>   +---+---+
>   | id|min|
>   +---+---+
>   |  A|  1|
>   |  A|  1|
>   +---+---+
> */
> {code}
> Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17673) Reused Exchange Aggregations Produce Incorrect Results

2016-09-26 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-17673:
---

 Summary: Reused Exchange Aggregations Produce Incorrect Results
 Key: SPARK-17673
 URL: https://issues.apache.org/jira/browse/SPARK-17673
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 2.0.1
Reporter: Russell Spitzer


https://datastax-oss.atlassian.net/browse/SPARKC-429

Was brought to my attention where the following code produces incorrect results

{code}
 val data = List(TestData("A", 1, 7))
val frame = 
session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))

frame.createCassandraTable(
  keySpaceName,
  table,
  partitionKeyColumns = Some(Seq("id")))

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> table, "keyspace" -> keySpaceName))
  .save()

val loaded = sparkSession.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> table, "keyspace" -> ks))
  .load()
  .select("id", "col1", "col2")
val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
 min1.union(min2).show()
/* prints:
  +---+---+
  | id|min|
  +---+---+
  |  A|  1|
  |  A|  1|
  +---+---+
 Should be 
  | A| 1|
  | A| 7|
 */
{code}

I looked into the explain pattern and saw 
{code}
Union
:- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
:  +- Exchange hashpartitioning(id#93, 200)
: +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
:+- *Scan 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
+- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
   +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
{code}

Which was different than using a parallelized collection as the DF backing. So 
I tested the same code with a Parquet backed DF and saw the same results.

{code}
frame.write.parquet("garbagetest")
val parquet = sparkSession.read.parquet("garbagetest").select("id", "col1", 
"col2")
println("PDF")
parquetmin1.union(parquetmin2).explain()
parquetmin1.union(parquetmin2).show()
/* prints:
  +---+---+
  | id|min|
  +---+---+
  |  A|  1|
  |  A|  1|
  +---+---+
*/
{code}

Which leads me to believe there is something wrong with the reused exchange. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16614) DirectJoin with DataSource for SparkSQL

2016-09-02 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458784#comment-15458784
 ] 

Russell Spitzer commented on SPARK-16614:
-

Yes. This would be similar to how Presto works by default. The normal operation 
is to pull only the keys in the Source unless the Source is very large at which 
point it does the full scan and then join.


Quick example if DF A only has 3 rows, we only want to do 3 requests against C* 
(if possible)

In the Cassandra case joins against certain columns can be done directly while 
some columns will still require a Full Table Scan

> DirectJoin with DataSource for SparkSQL
> ---
>
> Key: SPARK-16614
> URL: https://issues.apache.org/jira/browse/SPARK-16614
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Russell Spitzer
>
> Join behaviors against some datasources can be improved by skipping a full 
> scan and instead performing a series of point lookups.
> An example
> {code}DataFrame A contains { key1, key5, key302, ... key 50923423} 
> DataFrame B is a source reading from a C* database with keys {key1, key2, 
> key3 }
> a.join(b){code}
> Currently this will cause the entirety of the DataFrame B to be read into 
> memory before performing a Join. Instead it would be useful if we could 
> expose another api, {{DirectJoinSource}} which allowed connectors to provide 
> a means of requesting a non-contiguous subset of keys from a DataSource.
> This kind of lookup would behave like the joinWithCasandraTable call in the 
> Spark Cassandra Connector 
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable.
>  
> We find that this is much more useful when the end user is requesting only a 
> small portion of well defined records. I believe this could be applicable to 
> a variety of datasources where reading the entire source is inefficient 
> compared to point lookups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16725) Migrate Guava to 16+?

2016-08-16 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423555#comment-15423555
 ] 

Russell Spitzer commented on SPARK-16725:
-

I'm well aware as we've been dealing with this since 1.0, that's why we begun 
the process of shading Guava for Hadoop based builds, now though we are stuck 
doing it for all builds :(

> Migrate Guava to 16+?
> -
>
> Key: SPARK-16725
> URL: https://issues.apache.org/jira/browse/SPARK-16725
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.0.1
>Reporter: Min Wei
>Priority: Minor
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Currently Spark depends on an old version of Guava, version 14. However 
> Spark-cassandra driver asserts on Guava version 16 and above. 
> It would be great to update the Guava dependency to version 16+
> diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
> b/core/src/main/scala/org/apache/spark/SecurityManager.scala
> index f72c7de..abddafe 100644
> --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
> +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
> @@ -23,7 +23,7 @@ import java.security.{KeyStore, SecureRandom}
>  import java.security.cert.X509Certificate
>  import javax.net.ssl._
>  
> -import com.google.common.hash.HashCodes
> +import com.google.common.hash.HashCode
>  import com.google.common.io.Files
>  import org.apache.hadoop.io.Text
>  
> @@ -432,7 +432,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
>  val secret = new Array[Byte](length)
>  rnd.nextBytes(secret)
>  
> -val cookie = HashCodes.fromBytes(secret).toString()
> +val cookie = HashCode.fromBytes(secret).toString()
>  SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, 
> cookie)
>  cookie
>} else {
> diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
> b/core/src/main/scala/org/apache/spark/SparkEnv.scala
> index af50a6d..02545ae 100644
> --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
> +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
> @@ -72,7 +72,7 @@ class SparkEnv (
>  
>// A general, soft-reference map for metadata needed during HadoopRDD 
> split computation
>// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
> -  private[spark] val hadoopJobMetadata = new 
> MapMaker().softValues().makeMap[String, Any]()
> +  private[spark] val hadoopJobMetadata = new 
> MapMaker().weakValues().makeMap[String, Any]()
>  
>private[spark] var driverTmpDir: Option[String] = None
>  
> diff --git a/pom.xml b/pom.xml
> index d064cb5..7c3e036 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -368,8 +368,7 @@
>
>  com.google.guava
>  guava
> -14.0.1
> -provided
> +19.0
>
>
>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16725) Migrate Guava to 16+?

2016-08-16 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423539#comment-15423539
 ] 

Russell Spitzer commented on SPARK-16725:
-

In our case it's exposing a library which exposes the shaded code. Ie we 
include the Cassandra Java Driver which publicly exposes Guava in some places. 
So those access points are necessarily broken but it's not something we can 
directly control.

> Migrate Guava to 16+?
> -
>
> Key: SPARK-16725
> URL: https://issues.apache.org/jira/browse/SPARK-16725
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.0.1
>Reporter: Min Wei
>Priority: Minor
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Currently Spark depends on an old version of Guava, version 14. However 
> Spark-cassandra driver asserts on Guava version 16 and above. 
> It would be great to update the Guava dependency to version 16+
> diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
> b/core/src/main/scala/org/apache/spark/SecurityManager.scala
> index f72c7de..abddafe 100644
> --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
> +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
> @@ -23,7 +23,7 @@ import java.security.{KeyStore, SecureRandom}
>  import java.security.cert.X509Certificate
>  import javax.net.ssl._
>  
> -import com.google.common.hash.HashCodes
> +import com.google.common.hash.HashCode
>  import com.google.common.io.Files
>  import org.apache.hadoop.io.Text
>  
> @@ -432,7 +432,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
>  val secret = new Array[Byte](length)
>  rnd.nextBytes(secret)
>  
> -val cookie = HashCodes.fromBytes(secret).toString()
> +val cookie = HashCode.fromBytes(secret).toString()
>  SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, 
> cookie)
>  cookie
>} else {
> diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
> b/core/src/main/scala/org/apache/spark/SparkEnv.scala
> index af50a6d..02545ae 100644
> --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
> +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
> @@ -72,7 +72,7 @@ class SparkEnv (
>  
>// A general, soft-reference map for metadata needed during HadoopRDD 
> split computation
>// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
> -  private[spark] val hadoopJobMetadata = new 
> MapMaker().softValues().makeMap[String, Any]()
> +  private[spark] val hadoopJobMetadata = new 
> MapMaker().weakValues().makeMap[String, Any]()
>  
>private[spark] var driverTmpDir: Option[String] = None
>  
> diff --git a/pom.xml b/pom.xml
> index d064cb5..7c3e036 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -368,8 +368,7 @@
>
>  com.google.guava
>  guava
> -14.0.1
> -provided
> +19.0
>
>
>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16725) Migrate Guava to 16+?

2016-08-16 Thread Russell Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423498#comment-15423498
 ] 

Russell Spitzer commented on SPARK-16725:
-

I think *But it works* is a bit of an overstatement. It "works" when those 
shaded libraries are never exposed through a public api but it is basically 
broken whenever they are.

> Migrate Guava to 16+?
> -
>
> Key: SPARK-16725
> URL: https://issues.apache.org/jira/browse/SPARK-16725
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.0.1
>Reporter: Min Wei
>Priority: Minor
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> Currently Spark depends on an old version of Guava, version 14. However 
> Spark-cassandra driver asserts on Guava version 16 and above. 
> It would be great to update the Guava dependency to version 16+
> diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
> b/core/src/main/scala/org/apache/spark/SecurityManager.scala
> index f72c7de..abddafe 100644
> --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
> +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
> @@ -23,7 +23,7 @@ import java.security.{KeyStore, SecureRandom}
>  import java.security.cert.X509Certificate
>  import javax.net.ssl._
>  
> -import com.google.common.hash.HashCodes
> +import com.google.common.hash.HashCode
>  import com.google.common.io.Files
>  import org.apache.hadoop.io.Text
>  
> @@ -432,7 +432,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
>  val secret = new Array[Byte](length)
>  rnd.nextBytes(secret)
>  
> -val cookie = HashCodes.fromBytes(secret).toString()
> +val cookie = HashCode.fromBytes(secret).toString()
>  SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, 
> cookie)
>  cookie
>} else {
> diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
> b/core/src/main/scala/org/apache/spark/SparkEnv.scala
> index af50a6d..02545ae 100644
> --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
> +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
> @@ -72,7 +72,7 @@ class SparkEnv (
>  
>// A general, soft-reference map for metadata needed during HadoopRDD 
> split computation
>// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
> -  private[spark] val hadoopJobMetadata = new 
> MapMaker().softValues().makeMap[String, Any]()
> +  private[spark] val hadoopJobMetadata = new 
> MapMaker().weakValues().makeMap[String, Any]()
>  
>private[spark] var driverTmpDir: Option[String] = None
>  
> diff --git a/pom.xml b/pom.xml
> index d064cb5..7c3e036 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -368,8 +368,7 @@
>
>  com.google.guava
>  guava
> -14.0.1
> -provided
> +19.0
>
>
>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16614) DirectJoin with DataSource for SparkSQL

2016-07-19 Thread Russell Spitzer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Spitzer updated SPARK-16614:

Description: 
Join behaviors against some datasources can be improved by skipping a full scan 
and instead performing a series of point lookups.

An example

{code}DataFrame A contains { key1, key5, key302, ... key 50923423} 
DataFrame B is a source reading from a C* database with keys {key1, key2, 
key3 }
a.join(b){code}

Currently this will cause the entirety of the DataFrame B to be read into 
memory before performing a Join. Instead it would be useful if we could expose 
another api, {{DirectJoinSource}} which allowed connectors to provide a means 
of requesting a non-contiguous subset of keys from a DataSource.

This kind of lookup would behave like the joinWithCasandraTable call in the 
Spark Cassandra Connector 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable.
 

We find that this is much more useful when the end user is requesting only a 
small portion of well defined records. I believe this could be applicable to a 
variety of datasources where reading the entire source is inefficient compared 
to point lookups.





  was:
Join behaviors against some datasources can be improved by skipping a full scan 
and instead performing a series of point lookups.

An example

{code}DataFrame A contains { key1, key5, key302, ... key 50923423} 
DataFrame B is a source reading from a C* database with keys {key1, key2, 
key3 }
a.join(b){code}

Currently this will cause the entirety of the DataFrame B to be read into 
memory before performing a Join. Instead it would be useful if we could expose 
another api, {{DirectJoinSource}} which allowed connectors to provide a means 
of requests a non-contiguous subset of keys from a DataSource.

This kind of lookup would behave like the joinWithCasandraTable call in the 
Spark Cassandra Connector 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable.
 

We find that this is much more useful when the end user is requesting only a 
small portion of well defined records. I believe this could be applicable to a 
variety of datasources where reading the entire source is inefficient compared 
to point lookups.






> DirectJoin with DataSource for SparkSQL
> ---
>
> Key: SPARK-16614
> URL: https://issues.apache.org/jira/browse/SPARK-16614
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Russell Spitzer
>
> Join behaviors against some datasources can be improved by skipping a full 
> scan and instead performing a series of point lookups.
> An example
> {code}DataFrame A contains { key1, key5, key302, ... key 50923423} 
> DataFrame B is a source reading from a C* database with keys {key1, key2, 
> key3 }
> a.join(b){code}
> Currently this will cause the entirety of the DataFrame B to be read into 
> memory before performing a Join. Instead it would be useful if we could 
> expose another api, {{DirectJoinSource}} which allowed connectors to provide 
> a means of requesting a non-contiguous subset of keys from a DataSource.
> This kind of lookup would behave like the joinWithCasandraTable call in the 
> Spark Cassandra Connector 
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable.
>  
> We find that this is much more useful when the end user is requesting only a 
> small portion of well defined records. I believe this could be applicable to 
> a variety of datasources where reading the entire source is inefficient 
> compared to point lookups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16616) Allow Catalyst to take Advantage of Hash Partitioned DataSources

2016-07-18 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-16616:
---

 Summary: Allow Catalyst to take Advantage of Hash Partitioned 
DataSources
 Key: SPARK-16616
 URL: https://issues.apache.org/jira/browse/SPARK-16616
 Project: Spark
  Issue Type: New Feature
Reporter: Russell Spitzer


Many Distributed Databases provide hash partitioned data (in contrast to data 
partitioned on a specific column) and this information can be used to greatly 
enhance Spark performance. 

For example: 

Data within Cassandra is distributed based on a Hash of the "Partition Key" 
which is a set of columns. This means all values read from the database which 
contain the same "partition key" will exist in the same Spark Partition. When 
these rows are joined with themselves or aggregated on these "Partition Key" 
columns there is no need to do a shuffle.

{code}
CREATE TABLE (UserID int, purchase int, amount int, PRIMARY KEY (customer, 
purchase))
{code}

Would internally (using the SparkCassandraConnector) make an RDD that looks like

{code}
Spark Partition 1 :  (1, 1, 5), (1, 2, 6), (432, 1, 10)  
Spark Partition 2 :  (2, 1, 4), (2, 2, 5), (700, 1, 1) ...
{code}

Where the all values for {{UserID}} 1 are in the First Partition but the values 
contained within Spark Partition 1 do not cover a contiguous range of values 
for {{UserID}}

Like with normal RDDs, it would be nice if we could expose a Partitioning 
function that (given the key value) we could indicate what partition the row 
would be in. This information could also be used in aggregates and joins. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16614) DirectJoin with DataSource for SparkSQL

2016-07-18 Thread Russell Spitzer (JIRA)
Russell Spitzer created SPARK-16614:
---

 Summary: DirectJoin with DataSource for SparkSQL
 Key: SPARK-16614
 URL: https://issues.apache.org/jira/browse/SPARK-16614
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.0.0
Reporter: Russell Spitzer


Join behaviors against some datasources can be improved by skipping a full scan 
and instead performing a series of point lookups.

An example

{code}DataFrame A contains { key1, key5, key302, ... key 50923423} 
DataFrame B is a source reading from a C* database with keys {key1, key2, 
key3 }
a.join(b){code}

Currently this will cause the entirety of the DataFrame B to be read into 
memory before performing a Join. Instead it would be useful if we could expose 
another api, {{DirectJoinSource}} which allowed connectors to provide a means 
of requests a non-contiguous subset of keys from a DataSource.

This kind of lookup would behave like the joinWithCasandraTable call in the 
Spark Cassandra Connector 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable.
 

We find that this is much more useful when the end user is requesting only a 
small portion of well defined records. I believe this could be applicable to a 
variety of datasources where reading the entire source is inefficient compared 
to point lookups.







--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org