[jira] [Assigned] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24397:


Assignee: Tathagata Das  (was: Apache Spark)

> Add TaskContext.getLocalProperties in Python
> 
>
> Key: SPARK-24397
> URL: https://issues.apache.org/jira/browse/SPARK-24397
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491429#comment-16491429
 ] 

Apache Spark commented on SPARK-24397:
--

User 'tdas' has created a pull request for this issue:
https://github.com/apache/spark/pull/21437

> Add TaskContext.getLocalProperties in Python
> 
>
> Key: SPARK-24397
> URL: https://issues.apache.org/jira/browse/SPARK-24397
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24397:


Assignee: Apache Spark  (was: Tathagata Das)

> Add TaskContext.getLocalProperties in Python
> 
>
> Key: SPARK-24397
> URL: https://issues.apache.org/jira/browse/SPARK-24397
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24250) support accessing SQLConf inside tasks

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491419#comment-16491419
 ] 

Apache Spark commented on SPARK-24250:
--

User 'gatorsmile' has created a pull request for this issue:
https://github.com/apache/spark/pull/21436

> support accessing SQLConf inside tasks
> --
>
> Key: SPARK-24250
> URL: https://issues.apache.org/jira/browse/SPARK-24250
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 2.4.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-05-25 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491415#comment-16491415
 ] 

Tathagata Das commented on SPARK-24396:
---

TaskContext.getLocalProperty in Python is needed for getting the 
batchId/epochId that is passed on by StreamExecution as Spark job local 
property.

> Add Structured Streaming ForeachWriter for python
> -
>
> Key: SPARK-24396
> URL: https://issues.apache.org/jira/browse/SPARK-24396
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Users should be able to write ForeachWriter code in python, that is, they 
> should be able to use the partitionid and the version/batchId/epochId to 
> conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-05-25 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491415#comment-16491415
 ] 

Tathagata Das edited comment on SPARK-24396 at 5/26/18 12:07 AM:
-

TaskContext.getLocalProperty (SPARK-24397) in Python is needed for getting the 
batchId/epochId that is passed on by StreamExecution as Spark job local 
property.


was (Author: tdas):
TaskContext.getLocalProperty in Python is needed for getting the 
batchId/epochId that is passed on by StreamExecution as Spark job local 
property.

> Add Structured Streaming ForeachWriter for python
> -
>
> Key: SPARK-24396
> URL: https://issues.apache.org/jira/browse/SPARK-24396
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Users should be able to write ForeachWriter code in python, that is, they 
> should be able to use the partitionid and the version/batchId/epochId to 
> conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-24397:
--
Issue Type: New Feature  (was: Sub-task)
Parent: (was: SPARK-24396)

> Add TaskContext.getLocalProperties in Python
> 
>
> Key: SPARK-24397
> URL: https://issues.apache.org/jira/browse/SPARK-24397
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24397:
-

 Summary: Add TaskContext.getLocalProperties in Python
 Key: SPARK-24397
 URL: https://issues.apache.org/jira/browse/SPARK-24397
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-05-25 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24396:
-

 Summary: Add Structured Streaming ForeachWriter for python
 Key: SPARK-24396
 URL: https://issues.apache.org/jira/browse/SPARK-24396
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das


Users should be able to write ForeachWriter code in python, that is, they 
should be able to use the partitionid and the version/batchId/epochId to 
conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24359) SPIP: ML Pipelines in R

2018-05-25 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491412#comment-16491412
 ] 

Joseph K. Bradley commented on SPARK-24359:
---

Regarding separating repos: What's the conclusion?  If feasible, I really hope 
this can be in the apache/spark repo to encourage contributors to add R 
wrappers whenever they add new MLlib APIs (just like it's pretty easy to add 
Python wrappers nowadays).

Regarding CRAN releases: I'd expect it to be well worth it to say SparkML minor 
releases correspond to Spark minor releases.  Users should not expect SparkML 
2.4 to work with Spark 2.3 (since R would encounter missing Java APIs).  I'm 
less sure about patch releases.  (Ideally, this would all be solved by us 
following semantic versioning, but that would require that we never add 
Experimental APIs to SparkML.)  If we can solve the maintainability issues with 
CRAN compatibility via integration tests, then I figure it'd be ideal to treat 
SparkML just like SparkR and PySpark, releasing in sync with the rest of Spark. 
 Thoughts?

> SPIP: ML Pipelines in R
> ---
>
> Key: SPARK-24359
> URL: https://issues.apache.org/jira/browse/SPARK-24359
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Affects Versions: 3.0.0
>Reporter: Hossein Falaki
>Priority: Major
>  Labels: SPIP
> Attachments: SparkML_ ML Pipelines in R-v2.pdf, SparkML_ ML Pipelines 
> in R.pdf
>
>
> h1. Background and motivation
> SparkR supports calling MLlib functionality with an [R-friendly 
> API|https://docs.google.com/document/d/10NZNSEurN2EdWM31uFYsgayIPfCFHiuIu3pCWrUmP_c/].
>  Since Spark 1.5 the (new) SparkML API which is based on [pipelines and 
> parameters|https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o]
>  has matured significantly. It allows users build and maintain complicated 
> machine learning pipelines. A lot of this functionality is difficult to 
> expose using the simple formula-based API in SparkR.
> We propose a new R package, _SparkML_, to be distributed along with SparkR as 
> part of Apache Spark. This new package will be built on top of SparkR’s APIs 
> to expose SparkML’s pipeline APIs and functionality.
> *Why not SparkR?*
> SparkR package contains ~300 functions. Many of these shadow functions in 
> base and other popular CRAN packages. We think adding more functions to 
> SparkR will degrade usability and make maintenance harder.
> *Why not sparklyr?*
> sparklyr is an R package developed by RStudio Inc. to expose Spark API to R 
> users. sparklyr includes MLlib API wrappers, but to the best of our knowledge 
> they are not comprehensive. Also we propose a code-gen approach for this 
> package to minimize work needed to expose future MLlib API, but sparklyr’s 
> API is manually written.
> h1. Target Personas
>  * Existing SparkR users who need more flexible SparkML API
>  * R users (data scientists, statisticians) who wish to build Spark ML 
> pipelines in R
> h1. Goals
>  * R users can install SparkML from CRAN
>  * R users will be able to import SparkML independent from SparkR
>  * After setting up a Spark session R users can
>  ** create a pipeline by chaining individual components and specifying their 
> parameters
>  ** tune a pipeline in parallel, taking advantage of Spark
>  ** inspect a pipeline’s parameters and evaluation metrics
>  ** repeatedly apply a pipeline
>  * MLlib contributors can easily add R wrappers for new MLlib Estimators and 
> Transformers
> h1. Non-Goals
>  * Adding new algorithms to SparkML R package which do not exist in Scala
>  * Parallelizing existing CRAN packages
>  * Changing existing SparkR ML wrapping API
> h1. Proposed API Changes
> h2. Design goals
> When encountering trade-offs in API, we will chose based on the following 
> list of priorities. The API choice that addresses a higher priority goal will 
> be chosen.
>  # *Comprehensive coverage of MLlib API:* Design choices that make R coverage 
> of future ML algorithms difficult will be ruled out.
>  * *Semantic clarity*: We attempt to minimize confusion with other packages. 
> Between consciousness and clarity, we will choose clarity.
>  * *Maintainability and testability:* API choices that require manual 
> maintenance or make testing difficult should be avoided.
>  * *Interoperability with rest of Spark components:* We will keep the R API 
> as thin as possible and keep all functionality implementation in JVM/Scala.
>  * *Being natural to R users:* Ultimate users of this package are R users and 
> they should find it easy and natural to use.
> The API will follow familiar R function syntax, where the object is passed as 
> the first argument of the method:  do_something(obj, arg1, arg2). All 
> functions are snake_case (e.g., 

[jira] [Updated] (SPARK-24359) SPIP: ML Pipelines in R

2018-05-25 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-24359:
--
Description: 
h1. Background and motivation

SparkR supports calling MLlib functionality with an [R-friendly 
API|https://docs.google.com/document/d/10NZNSEurN2EdWM31uFYsgayIPfCFHiuIu3pCWrUmP_c/].
 Since Spark 1.5 the (new) SparkML API which is based on [pipelines and 
parameters|https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o]
 has matured significantly. It allows users build and maintain complicated 
machine learning pipelines. A lot of this functionality is difficult to expose 
using the simple formula-based API in SparkR.

We propose a new R package, _SparkML_, to be distributed along with SparkR as 
part of Apache Spark. This new package will be built on top of SparkR’s APIs to 
expose SparkML’s pipeline APIs and functionality.

*Why not SparkR?*

SparkR package contains ~300 functions. Many of these shadow functions in base 
and other popular CRAN packages. We think adding more functions to SparkR will 
degrade usability and make maintenance harder.

*Why not sparklyr?*

sparklyr is an R package developed by RStudio Inc. to expose Spark API to R 
users. sparklyr includes MLlib API wrappers, but to the best of our knowledge 
they are not comprehensive. Also we propose a code-gen approach for this 
package to minimize work needed to expose future MLlib API, but sparklyr’s API 
is manually written.
h1. Target Personas
 * Existing SparkR users who need more flexible SparkML API
 * R users (data scientists, statisticians) who wish to build Spark ML 
pipelines in R

h1. Goals
 * R users can install SparkML from CRAN
 * R users will be able to import SparkML independent from SparkR
 * After setting up a Spark session R users can
 ** create a pipeline by chaining individual components and specifying their 
parameters
 ** tune a pipeline in parallel, taking advantage of Spark
 ** inspect a pipeline’s parameters and evaluation metrics
 ** repeatedly apply a pipeline
 * MLlib contributors can easily add R wrappers for new MLlib Estimators and 
Transformers

h1. Non-Goals
 * Adding new algorithms to SparkML R package which do not exist in Scala
 * Parallelizing existing CRAN packages
 * Changing existing SparkR ML wrapping API

h1. Proposed API Changes
h2. Design goals

When encountering trade-offs in API, we will chose based on the following list 
of priorities. The API choice that addresses a higher priority goal will be 
chosen.
 # *Comprehensive coverage of MLlib API:* Design choices that make R coverage 
of future ML algorithms difficult will be ruled out.

 * *Semantic clarity*: We attempt to minimize confusion with other packages. 
Between consciousness and clarity, we will choose clarity.

 * *Maintainability and testability:* API choices that require manual 
maintenance or make testing difficult should be avoided.

 * *Interoperability with rest of Spark components:* We will keep the R API as 
thin as possible and keep all functionality implementation in JVM/Scala.

 * *Being natural to R users:* Ultimate users of this package are R users and 
they should find it easy and natural to use.

The API will follow familiar R function syntax, where the object is passed as 
the first argument of the method:  do_something(obj, arg1, arg2). All functions 
are snake_case (e.g., {{spark_logistic_regression()}} and {{set_max_iter()}}). 
If a constructor gets arguments, they will be named arguments. For example:
{code:java}
> lr <- set_reg_param(set_max_iter(spark.logistic.regression()), 10), 0.1){code}
When calls need to be chained, like above example, syntax can nicely translate 
to a natural pipeline style with help from very popular[ magrittr 
package|https://cran.r-project.org/web/packages/magrittr/index.html]. For 
example:
{code:java}
> logistic_regression() %>% set_max_iter(10) %>% set_reg_param(0.01) -> lr{code}
h2. Namespace

All new API will be under a new CRAN package, named SparkML. The package should 
be usable without needing SparkR in the namespace. The package will introduce a 
number of S4 classes that inherit from four basic classes. Here we will list 
the basic types with a few examples. An object of any child class can be 
instantiated with a function call that starts with {{spark_}}.
h2. Pipeline & PipelineStage

A pipeline object contains one or more stages.  
{code:java}
> pipeline <- spark_pipeline() %>% set_stages(stage1, stage2, stage3){code}
Where stage1, stage2, etc are S4 objects of a PipelineStage and pipeline is an 
object of type Pipeline.
h2. Transformers

A Transformer is an algorithm that can transform one SparkDataFrame into 
another SparkDataFrame.

*Example API:*
{code:java}
> tokenizer <- spark_tokenizer() %>%

            set_input_col(“text”) %>%

            set_output_col(“words”)

> tokenized.df <- tokenizer %>% transform(df) {code}

[jira] [Commented] (SPARK-24300) generateLDAData in ml.cluster.LDASuite didn't set seed correctly

2018-05-25 Thread Lu Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491359#comment-16491359
 ] 

Lu Wang commented on SPARK-24300:
-

I will fix this issue.

> generateLDAData in ml.cluster.LDASuite didn't set seed correctly
> 
>
> Key: SPARK-24300
> URL: https://issues.apache.org/jira/browse/SPARK-24300
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Xiangrui Meng
>Assignee: Lu Wang
>Priority: Minor
>
> [https://github.com/apache/spark/blob/0d63ebd17df747fb41d7ba254718bb7af3ae/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala]
>  
> generateLDAData uses the same RNG in all partitions to generate random data. 
> This either causes duplicate rows in cluster mode or indeterministic behavior 
> in local mode:
> {code:java}
> scala> val rng = new java.util.Random(10)
> rng: java.util.Random = java.util.Random@78c5ef58
> scala> sc.parallelize(1 to 10).map { i => Seq.fill(10)(rng.nextInt(10)) 
> }.collect().mkString("\n")
> res12: String =
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8){code}
> We should create one RNG per partition to make it safe.
>  
> cc: [~lu.DB] [~josephkb]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24366) Improve error message for Catalyst type converters

2018-05-25 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24366.
-
   Resolution: Fixed
 Assignee: Maxim Gekk
Fix Version/s: 2.4.0

> Improve error message for Catalyst type converters
> --
>
> Key: SPARK-24366
> URL: https://issues.apache.org/jira/browse/SPARK-24366
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.3, 2.3.0
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Minor
> Fix For: 2.4.0
>
>
> User have no way to drill down to understand which of the hundreds of fields 
> in millions records feeding into the job are causing the problem. We should 
> to show where in the schema the error is happening.
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 4 in stage 344.0 failed 4 times, most recent failure: Lost task 4.3 in 
> stage 344.0 (TID 2673, ip-10-31-237-248.ec2.internal): scala.MatchError: 
> start (of class java.lang.String)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:161)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:161)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:161)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:161)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153)
>   at 
> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
>   at 
> 

[jira] [Commented] (SPARK-23455) Default Params in ML should be saved separately

2018-05-25 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491354#comment-16491354
 ] 

Joseph K. Bradley commented on SPARK-23455:
---

Yep, thanks [~viirya] for answering!  It will affect R only if we add ways for 
people to write custom ML Transformers and Estimators in R.

> Default Params in ML should be saved separately
> ---
>
> Key: SPARK-23455
> URL: https://issues.apache.org/jira/browse/SPARK-23455
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.0
>
>
> We save ML's user-supplied params and default params as one entity in JSON. 
> During loading the saved models, we set all the loaded params into created ML 
> model instances as user-supplied params.
> It causes some problems, e.g., if we strictly disallow some params to be set 
> at the same time, a default param can fail the param check because it is 
> treated as user-supplied param after loading.
> The loaded default params should not be set as user-supplied params. We 
> should save ML default params separately in JSON.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24369) A bug when having multiple distinct aggregations

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491342#comment-16491342
 ] 

Xiao Li commented on SPARK-24369:
-

Thanks!

> A bug when having multiple distinct aggregations
> 
>
> Key: SPARK-24369
> URL: https://issues.apache.org/jira/browse/SPARK-24369
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> {code}
> SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*) FROM
> (VALUES
>(1, 1),
>(2, 2),
>(2, 2)
> ) t(x, y)
> {code}
> It returns 
> {code}
> java.lang.RuntimeException
> You hit a query analyzer bug. Please report your query to Spark user mailing 
> list.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24122) Allow automatic driver restarts on K8s

2018-05-25 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491338#comment-16491338
 ] 

Yinan Li commented on SPARK-24122:
--

The operator does cover automatic restart of an application with a configurable 
restart policy. For batch ETL jobs, this is probably sufficient for common 
needs to restart jobs on failures. For streaming jobs, checkpointing is needed. 
https://issues.apache.org/jira/browse/SPARK-23980 is also relevant. 

> Allow automatic driver restarts on K8s
> --
>
> Key: SPARK-24122
> URL: https://issues.apache.org/jira/browse/SPARK-24122
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Oz Ben-Ami
>Priority: Minor
>
> [~foxish]
> Right now SparkSubmit creates the driver as a bare pod, rather than a managed 
> controller like a Deployment or a StatefulSet. This means there is no way to 
> guarantee automatic restarts, eg in case a node has an issue. Note Pod 
> RestartPolicy does not apply if a node fails. A StatefulSet would allow us to 
> guarantee that, and keep the ability for executors to find the driver using 
> DNS.
> This is particularly helpful for long-running streaming workloads, where we 
> currently use {{yarn.resourcemanager.am.max-attempts}} with YARN. I can 
> confirm that Spark Streaming and Structured Streaming applications can be 
> made to recover from such a restart, with the help of checkpointing. The 
> executors will have to be started again by the driver, but this should not be 
> a problem.
> For batch processing, we could alternatively use Kubernetes {{Job}} objects, 
> which restart pods on failure but not success. For example, note the 
> semantics provided by the {{kubectl run}} 
> [command|https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#run]
>  * {{--restart=Never}}: bare Pod
>  * {{--restart=Always}}: Deployment
>  * {{--restart=OnFailure}}: Job
> https://github.com/apache-spark-on-k8s/spark/issues/288



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24395) Fix Behavior of NOT IN with Literals Containing NULL

2018-05-25 Thread Miles Yucht (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Miles Yucht updated SPARK-24395:

Description: 
Spark does not return the correct answer when evaluating NOT IN in some cases. 
For example:
{code:java}
CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
  (null, null)
  AS m(a, b);

SELECT *
FROM   m
WHERE  a IS NULL AND b IS NULL
   AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
1;{code}
According to the semantics of null-aware anti-join, this should return no rows. 
However, it actually returns the row {{NULL NULL}}. This was found by 
inspecting the unit tests added for SPARK-24381 
([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).]

*Acceptance Criteria*:
 * We should be able to add the following test cases back to 
{{subquery/in-subquery/not-in-unit-test-multi-column-literal.sql}}:
{code:java}
  -- Case 3
  -- (probe-side columns are all null -> row not returned)
SELECT *
FROM   m
WHERE  a IS NULL AND b IS NULL -- Matches only (null, null)
   AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1;

  -- Case 4
  -- (one column null, other column matches a row in the subquery result -> row 
not returned)
SELECT *
FROM   m
WHERE  b = 1.0 -- Matches (null, 1.0)
   AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
1; 
{code}

 

cc [~smilegator] [~juliuszsompolski]

  was:
Spark does not return the correct answer when evaluating NOT IN in some cases. 
For example:
{code:java}
CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
  (null, null)
  AS m(a, b);

SELECT *
FROM   m
WHERE  a IS NULL AND b IS NULL
   AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
1;{code}
According to the semantics of null-aware anti-join, this should return no rows. 
However, it actually returns the row {{NULL NULL}}. This was found by 
inspecting the unit tests added for SPARK-24381 
([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).]

cc [~smilegator] [~juliuszsompolski]


> Fix Behavior of NOT IN with Literals Containing NULL
> 
>
> Key: SPARK-24395
> URL: https://issues.apache.org/jira/browse/SPARK-24395
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2
>Reporter: Miles Yucht
>Priority: Major
>
> Spark does not return the correct answer when evaluating NOT IN in some 
> cases. For example:
> {code:java}
> CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
>   (null, null)
>   AS m(a, b);
> SELECT *
> FROM   m
> WHERE  a IS NULL AND b IS NULL
>AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
> 1;{code}
> According to the semantics of null-aware anti-join, this should return no 
> rows. However, it actually returns the row {{NULL NULL}}. This was found by 
> inspecting the unit tests added for SPARK-24381 
> ([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).]
> *Acceptance Criteria*:
>  * We should be able to add the following test cases back to 
> {{subquery/in-subquery/not-in-unit-test-multi-column-literal.sql}}:
> {code:java}
>   -- Case 3
>   -- (probe-side columns are all null -> row not returned)
> SELECT *
> FROM   m
> WHERE  a IS NULL AND b IS NULL -- Matches only (null, null)
>AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
> 1;
>   -- Case 4
>   -- (one column null, other column matches a row in the subquery result -> 
> row not returned)
> SELECT *
> FROM   m
> WHERE  b = 1.0 -- Matches (null, 1.0)
>AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
> 1; 
> {code}
>  
> cc [~smilegator] [~juliuszsompolski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24300) generateLDAData in ml.cluster.LDASuite didn't set seed correctly

2018-05-25 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-24300:
--
Shepherd: Joseph K. Bradley

> generateLDAData in ml.cluster.LDASuite didn't set seed correctly
> 
>
> Key: SPARK-24300
> URL: https://issues.apache.org/jira/browse/SPARK-24300
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Xiangrui Meng
>Assignee: Lu Wang
>Priority: Minor
>
> [https://github.com/apache/spark/blob/0d63ebd17df747fb41d7ba254718bb7af3ae/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala]
>  
> generateLDAData uses the same RNG in all partitions to generate random data. 
> This either causes duplicate rows in cluster mode or indeterministic behavior 
> in local mode:
> {code:java}
> scala> val rng = new java.util.Random(10)
> rng: java.util.Random = java.util.Random@78c5ef58
> scala> sc.parallelize(1 to 10).map { i => Seq.fill(10)(rng.nextInt(10)) 
> }.collect().mkString("\n")
> res12: String =
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4)
> List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8){code}
> We should create one RNG per partition to make it safe.
>  
> cc: [~lu.DB] [~josephkb]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-6235) Address various 2G limits

2018-05-25 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-6235:
-

Assignee: Marcelo Vanzin

> Address various 2G limits
> -
>
> Key: SPARK-6235
> URL: https://issues.apache.org/jira/browse/SPARK-6235
> Project: Spark
>  Issue Type: Umbrella
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Assignee: Marcelo Vanzin
>Priority: Major
> Attachments: SPARK-6235_Design_V0.02.pdf
>
>
> An umbrella ticket to track the various 2G limit we have in Spark, due to the 
> use of byte arrays and ByteBuffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-6235) Address various 2G limits

2018-05-25 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reassigned SPARK-6235:
-

Assignee: (was: Marcelo Vanzin)

> Address various 2G limits
> -
>
> Key: SPARK-6235
> URL: https://issues.apache.org/jira/browse/SPARK-6235
> Project: Spark
>  Issue Type: Umbrella
>  Components: Shuffle, Spark Core
>Reporter: Reynold Xin
>Priority: Major
> Attachments: SPARK-6235_Design_V0.02.pdf
>
>
> An umbrella ticket to track the various 2G limit we have in Spark, due to the 
> use of byte arrays and ByteBuffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24395) Fix Behavior of NOT IN with Literals Containing NULL

2018-05-25 Thread Miles Yucht (JIRA)
Miles Yucht created SPARK-24395:
---

 Summary: Fix Behavior of NOT IN with Literals Containing NULL
 Key: SPARK-24395
 URL: https://issues.apache.org/jira/browse/SPARK-24395
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.2
Reporter: Miles Yucht


Spark does not return the correct answer when evaluating NOT IN in some cases. 
For example:
{code:java}
CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
  (null, null)
  AS m(a, b);

SELECT *
FROM   m
WHERE  a IS NULL AND b IS NULL
   AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 
1;{code}
According to the semantics of null-aware anti-join, this should return no rows. 
However, it actually returns the row {{NULL NULL}}. This was found by 
inspecting the unit tests added for SPARK-24381 
([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).]

cc [~smilegator] [~juliuszsompolski]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin updated SPARK-24392:
---
Target Version/s: 2.3.1

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
> Fix For: 2.3.1
>
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-25 Thread Li Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491309#comment-16491309
 ] 

Li Jin commented on SPARK-24373:


[~smilegator] do you mean that add AnalysisBarrier to RelationalGroupedDataset 
and KeyValueGroupedDataset could lead to new bugs?

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491305#comment-16491305
 ] 

Bryan Cutler edited comment on SPARK-24392 at 5/25/18 9:53 PM:
---

Targeting 2.3.1, will try to resolve today to not hold up the release.


was (Author: bryanc):
Targeting 2.3.1

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
> Fix For: 2.3.1
>
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491308#comment-16491308
 ] 

Marcelo Vanzin commented on SPARK-24392:


(There's a target version field for that, btw. Updating it...)

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
> Fix For: 2.3.1
>
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Bryan Cutler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491305#comment-16491305
 ] 

Bryan Cutler commented on SPARK-24392:
--

Targeting 2.3.1

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
> Fix For: 2.3.1
>
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Bryan Cutler (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Cutler updated SPARK-24392:
-
Fix Version/s: 2.3.1

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
> Fix For: 2.3.1
>
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491304#comment-16491304
 ] 

Xiao Li commented on SPARK-24373:
-

In the above example, each time when we re-analyze the plan that is recreated 
through the Dataset APIs count(), groupBy(), rollup(), cube(), rollup, pivot() 
and groupByKey(), the Analyzer rule HandleNullInputsForUDF will add the extra 
IF expression above the UDF in the previously resolved sub-plan. Note, this is 
not the only rule that could change the analyzed plans if we re-run the 
analyzer.

This is a regression introduced by 
[https://github.com/apache/spark/pull/17770]. We replaced the original solution 
(based on the analyzed flag) by the AnalysisBarrier. However, we did not add 
the AnalysisBarrier on the APIs of RelationalGroupedDataset and 
KeyValueGroupedDataset.

To fix it, we will changes the plan again. We might face some unknown issues. 
How about adding a temporary flag in Spark 2.3.1? If anything unexpected 
happens, our users still can change it back to the Spark 2.3.0 behavior?

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24394) Nodes in decision tree sometimes have negative impurity values

2018-05-25 Thread Barry Becker (JIRA)
Barry Becker created SPARK-24394:


 Summary: Nodes in decision tree sometimes have negative impurity 
values
 Key: SPARK-24394
 URL: https://issues.apache.org/jira/browse/SPARK-24394
 Project: Spark
  Issue Type: Bug
  Components: ML
Affects Versions: 2.3.0
 Environment: Spark 2.3.0

ML

linux
Reporter: Barry Becker


After doing some reading about gini and entropy based impurity (see 
[https://spark.apache.org/docs/2.2.0/mllib-decision-tree.html]) it seems that 
impurity values should always be bounded by 0 and 1. However, sometimes some 
leaf nodes (usually, but not always those with the minimum number of records) 
have negative impurity values (usually -1, but not always). This seems like bug 
in the impurity calculation, but I am not sure. This happens for both gini and 
entropy impurity at slightly different nodes. 

I can reproduce this with almost any dataset using pretty standard parameters 
like the following:

new DecisionTreeClassifier()
 .setLabelCol(targetName)
 .setMaxBins(100)
 .setMaxDepth(5)
 .setMinInfoGain(0.01)
 .setMinInstancesPerNode(5)

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point

2018-05-25 Thread Hafthor Stefansson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491267#comment-16491267
 ] 

Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:27 PM:
-

Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

Or maybe do what double casting would have

spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show

returns 1.00

except, I'd be worried about getting nulls when exceeding the range

spark.sql("select cast(cast(10 as decimal(2,0)) as decimal(2,1)) as x").show

returns null!

[https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c]


was (Author: hafthor):
Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

Or maybe do what double casting would have

spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show

returns 1.00

spark.sql("select cast(cast(10 as decimal(2,0)) as decimal(2,1)) as x").show

returns null!

[https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c]

> SparkSQL - Decimal data missing decimal point
> -
>
> Key: SPARK-23576
> URL: https://issues.apache.org/jira/browse/SPARK-23576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: spark 2.3.0
> linux
>Reporter: R
>Priority: Major
>
> Integers like 3 stored as a decimal display in sparksql as 300 with 
> no decimal point. But hive displays fine as 3.
> Repro steps:
>  # Create a .csv with the value 3
>  # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC 
> file
>  # Use spark to read the ORC, infer the schema (it will infer 38,18 
> precision) and output to a Parquet file
>  # Create external hive table to read the parquet ( define the hive type as 
> decimal(31,8))
>  # Use spark-sql to select from the external hive table.
>  # Notice how sparksql shows 300    !!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point

2018-05-25 Thread Hafthor Stefansson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491267#comment-16491267
 ] 

Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:26 PM:
-

Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

Or maybe do what double casting would have

spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show

returns 1.00

spark.sql("select cast(cast(10 as decimal(2,0)) as decimal(2,1)) as x").show

returns null!

[https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c]


was (Author: hafthor):
Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

Or maybe do what double casting would have

spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show

return 1.00

[https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c]

> SparkSQL - Decimal data missing decimal point
> -
>
> Key: SPARK-23576
> URL: https://issues.apache.org/jira/browse/SPARK-23576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: spark 2.3.0
> linux
>Reporter: R
>Priority: Major
>
> Integers like 3 stored as a decimal display in sparksql as 300 with 
> no decimal point. But hive displays fine as 3.
> Repro steps:
>  # Create a .csv with the value 3
>  # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC 
> file
>  # Use spark to read the ORC, infer the schema (it will infer 38,18 
> precision) and output to a Parquet file
>  # Create external hive table to read the parquet ( define the hive type as 
> decimal(31,8))
>  # Use spark-sql to select from the external hive table.
>  # Notice how sparksql shows 300    !!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24093) Make some fields of KafkaStreamWriter/InternalRowMicroBatchWriter visible to outside of the classes

2018-05-25 Thread Mingjie Tang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491285#comment-16491285
 ] 

Mingjie Tang commented on SPARK-24093:
--

i can add a PR for this. 

> Make some fields of KafkaStreamWriter/InternalRowMicroBatchWriter visible to 
> outside of the classes
> ---
>
> Key: SPARK-24093
> URL: https://issues.apache.org/jira/browse/SPARK-24093
> Project: Spark
>  Issue Type: Wish
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Weiqing Yang
>Priority: Minor
>
> To make third parties able to get the information of streaming writer, for 
> example, the information of "writer" and "topic" which streaming data are 
> written into, this jira is created to make relevant fields of 
> KafkaStreamWriter and InternalRowMicroBatchWriter visible to outside of the 
> classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23887) update query progress

2018-05-25 Thread Arun Mahadevan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491281#comment-16491281
 ] 

Arun Mahadevan commented on SPARK-23887:


We could probably invoke "ProgressReporter.finishTrigger" after each epoch. 
This would update the query execution stats and post StreamingQueryListener 
events. However I am not clear on how the SQL metrics could be updated since it 
relies on accumulator and the accumulators might not be updated unless the task 
completes. We could probably post some special event to the DAGScheduler to 
update the accumulators or need to figure out some other mechanisms.

[~joseph.torres] [~tdas] , what do you think?

> update query progress
> -
>
> Key: SPARK-23887
> URL: https://issues.apache.org/jira/browse/SPARK-23887
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24091) Internally used ConfigMap prevents use of user-specified ConfigMaps carrying Spark configs files

2018-05-25 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491279#comment-16491279
 ] 

Yinan Li commented on SPARK-24091:
--

Thanks [~tmckay]! I think the first approach is a good way of handling override 
and customization. 

> Internally used ConfigMap prevents use of user-specified ConfigMaps carrying 
> Spark configs files
> 
>
> Key: SPARK-24091
> URL: https://issues.apache.org/jira/browse/SPARK-24091
> Project: Spark
>  Issue Type: Brainstorming
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Yinan Li
>Priority: Major
>
> The recent PR [https://github.com/apache/spark/pull/20669] for removing the 
> init-container introduced a internally used ConfigMap carrying Spark 
> configuration properties in a file for the driver. This ConfigMap gets 
> mounted under {{$SPARK_HOME/conf}} and the environment variable 
> {{SPARK_CONF_DIR}} is set to point to the mount path. This pretty much 
> prevents users from mounting their own ConfigMaps that carry custom Spark 
> configuration files, e.g., {{log4j.properties}} and {{spark-env.sh}} and 
> leaves users with only the option of building custom images. IMO, it is very 
> useful to support mounting user-specified ConfigMaps for custom Spark 
> configuration files. This worths further discussions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point

2018-05-25 Thread Hafthor Stefansson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491267#comment-16491267
 ] 

Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:14 PM:
-

Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

Or maybe do what double casting would have

spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show

return 1.00

[https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c]


was (Author: hafthor):
Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c

> SparkSQL - Decimal data missing decimal point
> -
>
> Key: SPARK-23576
> URL: https://issues.apache.org/jira/browse/SPARK-23576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: spark 2.3.0
> linux
>Reporter: R
>Priority: Major
>
> Integers like 3 stored as a decimal display in sparksql as 300 with 
> no decimal point. But hive displays fine as 3.
> Repro steps:
>  # Create a .csv with the value 3
>  # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC 
> file
>  # Use spark to read the ORC, infer the schema (it will infer 38,18 
> precision) and output to a Parquet file
>  # Create external hive table to read the parquet ( define the hive type as 
> decimal(31,8))
>  # Use spark-sql to select from the external hive table.
>  # Notice how sparksql shows 300    !!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-25 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491271#comment-16491271
 ] 

Marco Gaido commented on SPARK-24373:
-

[~smilegator] yes, you're right, the impact would be definitely lower.

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point

2018-05-25 Thread Hafthor Stefansson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491267#comment-16491267
 ] 

Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:06 PM:
-

Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c


was (Author: hafthor):
Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

> SparkSQL - Decimal data missing decimal point
> -
>
> Key: SPARK-23576
> URL: https://issues.apache.org/jira/browse/SPARK-23576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: spark 2.3.0
> linux
>Reporter: R
>Priority: Major
>
> Integers like 3 stored as a decimal display in sparksql as 300 with 
> no decimal point. But hive displays fine as 3.
> Repro steps:
>  # Create a .csv with the value 3
>  # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC 
> file
>  # Use spark to read the ORC, infer the schema (it will infer 38,18 
> precision) and output to a Parquet file
>  # Create external hive table to read the parquet ( define the hive type as 
> decimal(31,8))
>  # Use spark-sql to select from the external hive table.
>  # Notice how sparksql shows 300    !!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23576) SparkSQL - Decimal data missing decimal point

2018-05-25 Thread Hafthor Stefansson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491267#comment-16491267
 ] 

Hafthor Stefansson commented on SPARK-23576:


Here's an equivalent problem:

spark.sql("select cast(1 as decimal(38,18)) as 
x").write.format("parquet").save("decimal.parq")

spark.read.schema(spark.sql("select cast(1 as decimal) as 
x").schema).parquet("decimal.parq").show

returns 100!

It should throw, like it would if I specified a schema with x as float, or some 
other type.

> SparkSQL - Decimal data missing decimal point
> -
>
> Key: SPARK-23576
> URL: https://issues.apache.org/jira/browse/SPARK-23576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: spark 2.3.0
> linux
>Reporter: R
>Priority: Major
>
> Integers like 3 stored as a decimal display in sparksql as 300 with 
> no decimal point. But hive displays fine as 3.
> Repro steps:
>  # Create a .csv with the value 3
>  # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC 
> file
>  # Use spark to read the ORC, infer the schema (it will infer 38,18 
> precision) and output to a Parquet file
>  # Create external hive table to read the parquet ( define the hive type as 
> decimal(31,8))
>  # Use spark-sql to select from the external hive table.
>  # Notice how sparksql shows 300    !!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

2018-05-25 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24373:

Summary: "df.cache() df.count()" no longer eagerly caches data when the 
analyzed plans are different after re-analyzing the plans  (was: "df.cache() 
df.count()" no longer eagerly caches data when the analyzed plans are different 
after rerunning the analyzer)

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23309) Spark 2.3 cached query performance 20-30% worse then spark 2.2

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491233#comment-16491233
 ] 

Xiao Li commented on SPARK-23309:
-

[~vanzin] https://issues.apache.org/jira/browse/SPARK-24373 is not related to 
this JIRA. This JIRA uses a pure SQL and thus it will not hit the problem 
caused by AnalysisBarrier.

> Spark 2.3 cached query performance 20-30% worse then spark 2.2
> --
>
> Key: SPARK-23309
> URL: https://issues.apache.org/jira/browse/SPARK-23309
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Thomas Graves
>Priority: Major
>
> I was testing spark 2.3 rc2 and I am seeing a performance regression in sql 
> queries on cached data.
> The size of the data: 10.4GB input from hive orc files /18.8 GB cached/5592 
> partitions
> Here is the example query:
> val dailycached = spark.sql("select something from table where dt = 
> '20170301' AND something IS NOT NULL")
> dailycached.createOrReplaceTempView("dailycached") 
> spark.catalog.cacheTable("dailyCached")
> spark.sql("SELECT COUNT(DISTINCT(something)) from dailycached").show()
>  
> On spark 2.2 I see queries times average 13 seconds
> On the same nodes I see spark 2.3 queries times average 17 seconds
> Note these are times of queries after the initial caching.  so just running 
> the last line again: 
> spark.sql("SELECT COUNT(DISTINCT(something)) from dailycached").show() 
> multiple times.
>  
> I also ran a query over more data (335GB input/587.5 GB cached) and saw a 
> similar discrepancy in the performance of querying cached data between spark 
> 2.3 and spark 2.2, where 2.2 was better by like 20%.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after rerunning the analyzer

2018-05-25 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24373:

Summary: "df.cache() df.count()" no longer eagerly caches data when the 
analyzed plans are different after rerunning the analyzer  (was: "df.cache() 
df.count()" no longer eagerly caches data)

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after rerunning the analyzer
> 
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491224#comment-16491224
 ] 

Xiao Li edited comment on SPARK-24373 at 5/25/18 8:24 PM:
--

{code}
  def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan =>
plan.executeCollect().head.getLong(0)
  }
{code}

Many Spark users are using df.count() after df.cache() for achieving eager 
caching.  Since our count() API is using `groupBy()`, the impact becomes much 
bigger. The count() API will not trigger the data materialization when the 
plans are different after multiple rounds of plan analysis. 


was (Author: smilegator):
{code}
  def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan =>
plan.executeCollect().head.getLong(0)
  }
{code}

Many Spark users are using df.count() after df.cache() for achieving eager 
caching.  Since our count() API is using `groupBy()`, the impact becomes much 
bigger if the plans are different after multiple rounds of plan analysis. 

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491224#comment-16491224
 ] 

Xiao Li edited comment on SPARK-24373 at 5/25/18 8:23 PM:
--

{code}
  def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan =>
plan.executeCollect().head.getLong(0)
  }
{code}

Many Spark users are using df.count() after df.cache() for achieving eager 
caching.  Since our count() API is using `groupBy()`, the impact becomes much 
bigger if the plans are different after multiple rounds of plan analysis. 


was (Author: smilegator):
{code}
  def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan =>
plan.executeCollect().head.getLong(0)
  }
{code}

Many Spark users are using df.count() after df.cache() for achieving eager 
caching.  Since our count() API is using `groupBy()`, the impact becomes much 
bigger. 

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24373:

Target Version/s: 2.3.1

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24373:

Priority: Blocker  (was: Major)

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24004) Tests of from_json for MapType

2018-05-25 Thread Maxim Gekk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Gekk resolved SPARK-24004.

Resolution: Won't Fix

> Tests of from_json for MapType
> --
>
> Key: SPARK-24004
> URL: https://issues.apache.org/jira/browse/SPARK-24004
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Maxim Gekk
>Priority: Trivial
>
> There are no tests for *from_json* that check *MapType* as a value type of 
> struct fields. The MapType should be supported as non-root type according to 
> current implementation of JacksonParser but the functionality is not checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491224#comment-16491224
 ] 

Xiao Li commented on SPARK-24373:
-

{code}
  def count(): Long = withAction("count", groupBy().count().queryExecution) { 
plan =>
plan.executeCollect().head.getLong(0)
  }
{code}

Many Spark users are using df.count() after df.cache() for achieving eager 
caching.  Since our count() API is using `groupBy()`, the impact becomes much 
bigger. 

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-15125) CSV data source recognizes empty quoted strings in the input as null.

2018-05-25 Thread Maxim Gekk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Gekk resolved SPARK-15125.

   Resolution: Fixed
Fix Version/s: 2.4.0

The issue has been fixed by 
https://github.com/apache/spark/commit/7a2d4895c75d4c232c377876b61c05a083eab3c8

> CSV data source recognizes empty quoted strings in the input as null. 
> --
>
> Key: SPARK-15125
> URL: https://issues.apache.org/jira/browse/SPARK-15125
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Suresh Thalamati
>Priority: Major
> Fix For: 2.4.0
>
>
> CSV data source does not differentiate between empty quoted strings and empty 
> fields  as null. In some scenarios user would want  to differentiate between 
> these values,  especially in the context of SQL where NULL , and empty string 
> have different meanings  If input data happens to be dump from traditional 
> relational data source, users will see different results for the SQL queries. 
> {code}
> Repro:
> Test Data: (test.csv)
> year,make,model,comment,price
> 2017,Tesla,Mode 3,looks nice.,35000.99
> 2016,Chevy,Bolt,"",29000.00
> 2015,Porsche,"",,
> scala> val df= sqlContext.read.format("csv").option("header", 
> "true").option("inferSchema", "true").option("nullValue", 
> null).load("/tmp/test.csv")
> df: org.apache.spark.sql.DataFrame = [year: int, make: string ... 3 more 
> fields]
> scala> df.show
> ++---+--+---++
> |year|   make| model|comment|   price|
> ++---+--+---++
> |2017|  Tesla|Mode 3|looks nice.|35000.99|
> |2016|  Chevy|  Bolt|   null| 29000.0|
> |2015|Porsche|  null|   null|null|
> ++---+--+---++
> Expected:
> ++---+--+---++
> |year|   make| model|comment|   price|
> ++---+--+---++
> |2017|  Tesla|Mode 3|looks nice.|35000.99|
> |2016|  Chevy|  Bolt|   | 29000.0|
> |2015|Porsche|  |   null|null|
> ++---+--+---++
> {code}
> Testing a fix for the this issue. I will give a shot at submitting a PR for 
> this soon. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24393) SQL builtin: isinf

2018-05-25 Thread Henry Robinson (JIRA)
Henry Robinson created SPARK-24393:
--

 Summary: SQL builtin: isinf
 Key: SPARK-24393
 URL: https://issues.apache.org/jira/browse/SPARK-24393
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Henry Robinson


Along with the existing {{isnan}}, it would be helpful to have {{isinf}} to 
test if a float or double value is {{Infinity}}. 





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491153#comment-16491153
 ] 

Tomasz Gawęda commented on SPARK-24373:
---

[~LI,Xiao]  That is a good idea :) Eager caching is useful, many times I see 
additional count just to cache eagerly

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491124#comment-16491124
 ] 

Marco Gaido commented on SPARK-24373:
-

[~smilegator] I think an eager API is not related to the problem experienced 
here, though.

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24324) Pandas Grouped Map UserDefinedFunction mixes column labels

2018-05-25 Thread Li Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491125#comment-16491125
 ] 

Li Jin commented on SPARK-24324:


Moved under Spark-22216 for better ticket organization.

> Pandas Grouped Map UserDefinedFunction mixes column labels
> --
>
> Key: SPARK-24324
> URL: https://issues.apache.org/jira/browse/SPARK-24324
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Python (using virtualenv):
> {noformat}
> $ python --version 
> Python 3.6.5
> {noformat}
> Modules installed:
> {noformat}
> arrow==0.12.1
> backcall==0.1.0
> bleach==2.1.3
> chardet==3.0.4
> decorator==4.3.0
> entrypoints==0.2.3
> findspark==1.2.0
> html5lib==1.0.1
> ipdb==0.11
> ipykernel==4.8.2
> ipython==6.3.1
> ipython-genutils==0.2.0
> ipywidgets==7.2.1
> jedi==0.12.0
> Jinja2==2.10
> jsonschema==2.6.0
> jupyter==1.0.0
> jupyter-client==5.2.3
> jupyter-console==5.2.0
> jupyter-core==4.4.0
> MarkupSafe==1.0
> mistune==0.8.3
> nbconvert==5.3.1
> nbformat==4.4.0
> notebook==5.5.0
> numpy==1.14.3
> pandas==0.22.0
> pandocfilters==1.4.2
> parso==0.2.0
> pbr==3.1.1
> pexpect==4.5.0
> pickleshare==0.7.4
> progressbar2==3.37.1
> prompt-toolkit==1.0.15
> ptyprocess==0.5.2
> pyarrow==0.9.0
> Pygments==2.2.0
> python-dateutil==2.7.2
> python-utils==2.3.0
> pytz==2018.4
> pyzmq==17.0.0
> qtconsole==4.3.1
> Send2Trash==1.5.0
> simplegeneric==0.8.1
> six==1.11.0
> SQLAlchemy==1.2.7
> stevedore==1.28.0
> terminado==0.8.1
> testpath==0.3.1
> tornado==5.0.2
> traitlets==4.3.2
> virtualenv==15.1.0
> virtualenv-clone==0.2.6
> virtualenvwrapper==4.7.2
> wcwidth==0.1.7
> webencodings==0.5.1
> widgetsnbextension==3.2.1
> {noformat}
>  
> Java:
> {noformat}
> $ java -version 
>  java version "1.8.0_171"
>  Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat}
> System:
> {noformat}
> $ lsb_release -a
> No LSB modules are available.
> Distributor ID:   Ubuntu
> Description:  Ubuntu 16.04.4 LTS
> Release:  16.04
> Codename: xenial
> {noformat}
>Reporter: Cristian Consonni
>Priority: Major
>
> I am working on Wikipedia page views (see [task T188041 on Wikimedia's 
> Pharicator|https://phabricator.wikimedia.org/T188041]). For simplicity, let's 
> say that these are the data:
> {noformat}
>
> {noformat}
> For each combination of (lang, page, day(timestamp)) I need to transform  the 
>  views for each hour:
> {noformat}
> 00:00 -> A
> 01:00 -> B
> ...
> {noformat}
> and concatenate the number of views for that hour.  So, if a page got 5 views 
> at 00:00 and 7 views at 01:00 it would become:
> {noformat}
> A5B7
> {noformat}
>  
> I have written a UDF called {code:python}concat_hours{code}
> However, the function is mixing the columns and I am not sure what is going 
> on. I wrote here a minimal complete example that reproduces the issue on my 
> system (the details of my environment are above).
> {code:python}
> #!/usr/bin/env python3
> # coding: utf-8
> input_data = b"""en Albert_Camus 20071210-00 150
> en Albert_Camus 20071210-01 148
> en Albert_Camus 20071210-02 197
> en Albert_Camus 20071211-20 145
> en Albert_Camus 20071211-21 131
> en Albert_Camus 20071211-22 154
> en Albert_Camus 20071211-230001 142
> en Albert_Caquot 20071210-02 1
> en Albert_Caquot 20071210-02 1
> en Albert_Caquot 20071210-040001 1
> en Albert_Caquot 20071211-06 1
> en Albert_Caquot 20071211-08 1
> en Albert_Caquot 20071211-15 3
> en Albert_Caquot 20071211-21 1"""
> import tempfile
> fp = tempfile.NamedTemporaryFile()
> fp.write(input_data)
> fp.seek(0)
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql.types import StructType, StructField
> from pyspark.sql.types import StringType, IntegerType, TimestampType
> from pyspark.sql import functions
> sc = pyspark.SparkContext(appName="udf_example")
> sqlctx = pyspark.SQLContext(sc)
> schema = StructType([StructField("lang", StringType(), False),
>  StructField("page", StringType(), False),
>  StructField("timestamp", TimestampType(), False),
>  StructField("views", IntegerType(), False)])
> df = sqlctx.read.csv(fp.name,
>  header=False,
>  schema=schema,
>  timestampFormat="MMdd-HHmmss",
>  sep=' ')
> df.count()
> df.dtypes
> df.show()
> new_schema = StructType([StructField("lang", StringType(), False),
>  StructField("page", StringType(), False),
>  StructField("day", StringType(), False),
>  StructField("enc", StringType(), False)])
> from 

[jira] [Updated] (SPARK-24324) Pandas Grouped Map UserDefinedFunction mixes column labels

2018-05-25 Thread Li Jin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-24324:
---
Issue Type: Sub-task  (was: Bug)
Parent: SPARK-22216

> Pandas Grouped Map UserDefinedFunction mixes column labels
> --
>
> Key: SPARK-24324
> URL: https://issues.apache.org/jira/browse/SPARK-24324
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Python (using virtualenv):
> {noformat}
> $ python --version 
> Python 3.6.5
> {noformat}
> Modules installed:
> {noformat}
> arrow==0.12.1
> backcall==0.1.0
> bleach==2.1.3
> chardet==3.0.4
> decorator==4.3.0
> entrypoints==0.2.3
> findspark==1.2.0
> html5lib==1.0.1
> ipdb==0.11
> ipykernel==4.8.2
> ipython==6.3.1
> ipython-genutils==0.2.0
> ipywidgets==7.2.1
> jedi==0.12.0
> Jinja2==2.10
> jsonschema==2.6.0
> jupyter==1.0.0
> jupyter-client==5.2.3
> jupyter-console==5.2.0
> jupyter-core==4.4.0
> MarkupSafe==1.0
> mistune==0.8.3
> nbconvert==5.3.1
> nbformat==4.4.0
> notebook==5.5.0
> numpy==1.14.3
> pandas==0.22.0
> pandocfilters==1.4.2
> parso==0.2.0
> pbr==3.1.1
> pexpect==4.5.0
> pickleshare==0.7.4
> progressbar2==3.37.1
> prompt-toolkit==1.0.15
> ptyprocess==0.5.2
> pyarrow==0.9.0
> Pygments==2.2.0
> python-dateutil==2.7.2
> python-utils==2.3.0
> pytz==2018.4
> pyzmq==17.0.0
> qtconsole==4.3.1
> Send2Trash==1.5.0
> simplegeneric==0.8.1
> six==1.11.0
> SQLAlchemy==1.2.7
> stevedore==1.28.0
> terminado==0.8.1
> testpath==0.3.1
> tornado==5.0.2
> traitlets==4.3.2
> virtualenv==15.1.0
> virtualenv-clone==0.2.6
> virtualenvwrapper==4.7.2
> wcwidth==0.1.7
> webencodings==0.5.1
> widgetsnbextension==3.2.1
> {noformat}
>  
> Java:
> {noformat}
> $ java -version 
>  java version "1.8.0_171"
>  Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat}
> System:
> {noformat}
> $ lsb_release -a
> No LSB modules are available.
> Distributor ID:   Ubuntu
> Description:  Ubuntu 16.04.4 LTS
> Release:  16.04
> Codename: xenial
> {noformat}
>Reporter: Cristian Consonni
>Priority: Major
>
> I am working on Wikipedia page views (see [task T188041 on Wikimedia's 
> Pharicator|https://phabricator.wikimedia.org/T188041]). For simplicity, let's 
> say that these are the data:
> {noformat}
>
> {noformat}
> For each combination of (lang, page, day(timestamp)) I need to transform  the 
>  views for each hour:
> {noformat}
> 00:00 -> A
> 01:00 -> B
> ...
> {noformat}
> and concatenate the number of views for that hour.  So, if a page got 5 views 
> at 00:00 and 7 views at 01:00 it would become:
> {noformat}
> A5B7
> {noformat}
>  
> I have written a UDF called {code:python}concat_hours{code}
> However, the function is mixing the columns and I am not sure what is going 
> on. I wrote here a minimal complete example that reproduces the issue on my 
> system (the details of my environment are above).
> {code:python}
> #!/usr/bin/env python3
> # coding: utf-8
> input_data = b"""en Albert_Camus 20071210-00 150
> en Albert_Camus 20071210-01 148
> en Albert_Camus 20071210-02 197
> en Albert_Camus 20071211-20 145
> en Albert_Camus 20071211-21 131
> en Albert_Camus 20071211-22 154
> en Albert_Camus 20071211-230001 142
> en Albert_Caquot 20071210-02 1
> en Albert_Caquot 20071210-02 1
> en Albert_Caquot 20071210-040001 1
> en Albert_Caquot 20071211-06 1
> en Albert_Caquot 20071211-08 1
> en Albert_Caquot 20071211-15 3
> en Albert_Caquot 20071211-21 1"""
> import tempfile
> fp = tempfile.NamedTemporaryFile()
> fp.write(input_data)
> fp.seek(0)
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql.types import StructType, StructField
> from pyspark.sql.types import StringType, IntegerType, TimestampType
> from pyspark.sql import functions
> sc = pyspark.SparkContext(appName="udf_example")
> sqlctx = pyspark.SQLContext(sc)
> schema = StructType([StructField("lang", StringType(), False),
>  StructField("page", StringType(), False),
>  StructField("timestamp", TimestampType(), False),
>  StructField("views", IntegerType(), False)])
> df = sqlctx.read.csv(fp.name,
>  header=False,
>  schema=schema,
>  timestampFormat="MMdd-HHmmss",
>  sep=' ')
> df.count()
> df.dtypes
> df.show()
> new_schema = StructType([StructField("lang", StringType(), False),
>  StructField("page", StringType(), False),
>  StructField("day", StringType(), False),
>  StructField("enc", StringType(), False)])
> from pyspark.sql.functions import 

[jira] [Updated] (SPARK-22809) pyspark is sensitive to imports with dots

2018-05-25 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin updated SPARK-22809:
---
Target Version/s: 2.4.0  (was: 2.3.1, 2.4.0)

> pyspark is sensitive to imports with dots
> -
>
> Key: SPARK-22809
> URL: https://issues.apache.org/jira/browse/SPARK-22809
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Cricket Temple
>Assignee: holdenk
>Priority: Major
>
> User code can fail with dotted imports.  Here's a repro script.
> {noformat}
> import numpy as np
> import pandas as pd
> import pyspark
> import scipy.interpolate
> import scipy.interpolate as scipy_interpolate
> import py4j
> scipy_interpolate2 = scipy.interpolate
> sc = pyspark.SparkContext()
> spark_session = pyspark.SQLContext(sc)
> ###
> # The details of this dataset are irrelevant  #
> # Sorry if you'd have preferred something more boring #
> ###
> x__ = np.linspace(0,10,1000)
> freq__ = np.arange(1,5)
> x_, freq_ = np.ix_(x__, freq__)
> y = np.sin(x_ * freq_).ravel()
> x = (x_ * np.ones(freq_.shape)).ravel()
> freq = (np.ones(x_.shape) * freq_).ravel()
> df_pd = pd.DataFrame(np.stack([x,y,freq]).T, columns=['x','y','freq'])
> df_sk = spark_session.createDataFrame(df_pd)
> assert(df_sk.toPandas() == df_pd).all().all()
> try:
> import matplotlib.pyplot as plt
> for f, data in df_pd.groupby("freq"):
> plt.plot(*data[['x','y']].values.T)
> plt.show()
> except:
> print("I guess we can't plot anything")
> def mymap(x, interp_fn):
> df = pd.DataFrame.from_records([row.asDict() for row in list(x)])
> return interp_fn(df.x.values, df.y.values)(np.pi)
> df_by_freq = df_sk.rdd.keyBy(lambda x: x.freq).groupByKey()
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy_interpolate.interp1d)).collect()
> assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), 
> atol=1e-6))
> try:
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy.interpolate.interp1d)).collect()
> raise Excpetion("Not going to reach this line")
> except py4j.protocol.Py4JJavaError, e:
> print("See?")
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy_interpolate2.interp1d)).collect()
> assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), 
> atol=1e-6))
> # But now it works!
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy.interpolate.interp1d)).collect()
> assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), 
> atol=1e-6))
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24383) spark on k8s: "driver-svc" are not getting deleted

2018-05-25 Thread Yinan Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491106#comment-16491106
 ] 

Yinan Li commented on SPARK-24383:
--

OK, then garbage collection should kick in and delete the service when the 
driver pod is gone unless there's some issue with the GC.

> spark on k8s: "driver-svc" are not getting deleted
> --
>
> Key: SPARK-24383
> URL: https://issues.apache.org/jira/browse/SPARK-24383
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Lenin
>Priority: Major
>
> When the driver pod exists, the "*driver-svc" services created for the driver 
> are not cleaned up. This causes accumulation of services in the k8s layer, at 
> one point no more services can be created. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22809) pyspark is sensitive to imports with dots

2018-05-25 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491107#comment-16491107
 ] 

Marcelo Vanzin commented on SPARK-22809:


I'm removing 2.3.1 since it doesn't seem there's any activity here. Please 
re-add if you plan to work on this for that release.

> pyspark is sensitive to imports with dots
> -
>
> Key: SPARK-22809
> URL: https://issues.apache.org/jira/browse/SPARK-22809
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Cricket Temple
>Assignee: holdenk
>Priority: Major
>
> User code can fail with dotted imports.  Here's a repro script.
> {noformat}
> import numpy as np
> import pandas as pd
> import pyspark
> import scipy.interpolate
> import scipy.interpolate as scipy_interpolate
> import py4j
> scipy_interpolate2 = scipy.interpolate
> sc = pyspark.SparkContext()
> spark_session = pyspark.SQLContext(sc)
> ###
> # The details of this dataset are irrelevant  #
> # Sorry if you'd have preferred something more boring #
> ###
> x__ = np.linspace(0,10,1000)
> freq__ = np.arange(1,5)
> x_, freq_ = np.ix_(x__, freq__)
> y = np.sin(x_ * freq_).ravel()
> x = (x_ * np.ones(freq_.shape)).ravel()
> freq = (np.ones(x_.shape) * freq_).ravel()
> df_pd = pd.DataFrame(np.stack([x,y,freq]).T, columns=['x','y','freq'])
> df_sk = spark_session.createDataFrame(df_pd)
> assert(df_sk.toPandas() == df_pd).all().all()
> try:
> import matplotlib.pyplot as plt
> for f, data in df_pd.groupby("freq"):
> plt.plot(*data[['x','y']].values.T)
> plt.show()
> except:
> print("I guess we can't plot anything")
> def mymap(x, interp_fn):
> df = pd.DataFrame.from_records([row.asDict() for row in list(x)])
> return interp_fn(df.x.values, df.y.values)(np.pi)
> df_by_freq = df_sk.rdd.keyBy(lambda x: x.freq).groupByKey()
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy_interpolate.interp1d)).collect()
> assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), 
> atol=1e-6))
> try:
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy.interpolate.interp1d)).collect()
> raise Excpetion("Not going to reach this line")
> except py4j.protocol.Py4JJavaError, e:
> print("See?")
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy_interpolate2.interp1d)).collect()
> assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), 
> atol=1e-6))
> # But now it works!
> result = df_by_freq.mapValues(lambda x: mymap(x, 
> scipy.interpolate.interp1d)).collect()
> assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), 
> atol=1e-6))
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491099#comment-16491099
 ] 

Marcelo Vanzin commented on SPARK-24392:


What release is this supposed to block?

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491088#comment-16491088
 ] 

Xiao Li commented on SPARK-24373:
-

BTW, I plan to continue my work of https://github.com/apache/spark/pull/18717, 
which will add an eager persist/cache API. 

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491081#comment-16491081
 ] 

Xiao Li edited comment on SPARK-24373 at 5/25/18 5:57 PM:
--

[~icexelloss] [~aweise] Are you also using the Dataset APIs groupBy(), 
rollup(), cube(), rollup, pivot() and groupByKey()?


was (Author: smilegator):
[~icexelloss] [~aweise] Are you also using the Dataset APIs groupBy(), 
rollup(), cube(), rollup, pivot()?

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Li Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491086#comment-16491086
 ] 

Li Jin commented on SPARK-24373:


We use groupby() and pivot()

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491081#comment-16491081
 ] 

Xiao Li commented on SPARK-24373:
-

[~icexelloss] [~aweise] Are you also using the Dataset APIs groupBy(), 
rollup(), cube(), rollup, pivot()?

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24392:


Assignee: Apache Spark

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Assignee: Apache Spark
>Priority: Blocker
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491042#comment-16491042
 ] 

Apache Spark commented on SPARK-24392:
--

User 'BryanCutler' has created a pull request for this issue:
https://github.com/apache/spark/pull/21435

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24392:


Assignee: (was: Apache Spark)

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490996#comment-16490996
 ] 

Apache Spark commented on SPARK-24331:
--

User 'mn-mikke' has created a pull request for this issue:
https://github.com/apache/spark/pull/21434

> Add arrays_overlap / array_repeat / map_entries  
> -
>
> Key: SPARK-24331
> URL: https://issues.apache.org/jira/browse/SPARK-24331
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Marek Novotny
>Priority: Major
>
> Add SparkR equivalent to:
>  * arrays_overlap - SPARK-23922
>  * array_repeat - SPARK-23925
>  * map_entries - SPARK-23935



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24331:


Assignee: Apache Spark

> Add arrays_overlap / array_repeat / map_entries  
> -
>
> Key: SPARK-24331
> URL: https://issues.apache.org/jira/browse/SPARK-24331
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Marek Novotny
>Assignee: Apache Spark
>Priority: Major
>
> Add SparkR equivalent to:
>  * arrays_overlap - SPARK-23922
>  * array_repeat - SPARK-23925
>  * map_entries - SPARK-23935



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24331:


Assignee: (was: Apache Spark)

> Add arrays_overlap / array_repeat / map_entries  
> -
>
> Key: SPARK-24331
> URL: https://issues.apache.org/jira/browse/SPARK-24331
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Marek Novotny
>Priority: Major
>
> Add SparkR equivalent to:
>  * arrays_overlap - SPARK-23922
>  * array_repeat - SPARK-23925
>  * map_entries - SPARK-23935



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Bryan Cutler (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Cutler updated SPARK-24392:
-
Priority: Blocker  (was: Critical)

> Mark pandas_udf as Experimental
> ---
>
> Key: SPARK-24392
> URL: https://issues.apache.org/jira/browse/SPARK-24392
> Project: Spark
>  Issue Type: Task
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Bryan Cutler
>Priority: Blocker
>
> This functionality is still evolving and has introduced some bugs .  It was 
> an oversight to not mark it as experimental before it was released in 2.3.0.  
> Not sure if it is a good idea to change this after the fact, but I'm opening 
> this to continue discussion from 
> https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24392) Mark pandas_udf as Experimental

2018-05-25 Thread Bryan Cutler (JIRA)
Bryan Cutler created SPARK-24392:


 Summary: Mark pandas_udf as Experimental
 Key: SPARK-24392
 URL: https://issues.apache.org/jira/browse/SPARK-24392
 Project: Spark
  Issue Type: Task
  Components: PySpark
Affects Versions: 2.3.0
Reporter: Bryan Cutler


This functionality is still evolving and has introduced some bugs .  It was an 
oversight to not mark it as experimental before it was released in 2.3.0.  Not 
sure if it is a good idea to change this after the fact, but I'm opening this 
to continue discussion from 
https://github.com/apache/spark/pull/21427#issuecomment-391967423



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24391) to_json/from_json should support arrays of primitives, and more generally all JSON

2018-05-25 Thread Sam Kitajima-Kimbrel (JIRA)
Sam Kitajima-Kimbrel created SPARK-24391:


 Summary: to_json/from_json should support arrays of primitives, 
and more generally all JSON 
 Key: SPARK-24391
 URL: https://issues.apache.org/jira/browse/SPARK-24391
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Sam Kitajima-Kimbrel


https://issues.apache.org/jira/browse/SPARK-19849 and 
https://issues.apache.org/jira/browse/SPARK-21513 brought support for more 
column types to functions.to_json/from_json, but I also have cases where I'd 
like to simply (de)serialize an array of primitives to/from JSON when 
outputting to certain destinations, which does not work:
{code:java}
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq("[1, 2, 3]").toDF("a")
df: org.apache.spark.sql.DataFrame = [a: string]

scala> val schema = new ArrayType(IntegerType, false)
schema: org.apache.spark.sql.types.ArrayType = ArrayType(IntegerType,false)

scala> df.select(from_json($"a", schema))
org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`a`)' due 
to data type mismatch: Input schema array must be a struct or an array of 
structs.;;
'Project [jsontostructs(ArrayType(IntegerType,false), a#3, 
Some(America/Los_Angeles)) AS jsontostructs(a)#10]

scala> val arrayDf = Seq(Array(1, 2, 3)).toDF("arr")
arrayDf: org.apache.spark.sql.DataFrame = [arr: array]

scala> arrayDf.select(to_json($"arr"))
org.apache.spark.sql.AnalysisException: cannot resolve 'structstojson(`arr`)' 
due to data type mismatch: Input type array must be a struct, array of 
structs or a map or array of map.;;
'Project [structstojson(arr#19, Some(America/Los_Angeles)) AS 
structstojson(arr)#26]
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23820) Allow the long form of call sites to be recorded in the log

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23820:


Assignee: Apache Spark

> Allow the long form of call sites to be recorded in the log
> ---
>
> Key: SPARK-23820
> URL: https://issues.apache.org/jira/browse/SPARK-23820
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Michael Mior
>Assignee: Apache Spark
>Priority: Major
>
> It would be nice if the long form of the callsite information could be 
> included in the log. An example of what I'm proposing is here: 
> https://github.com/michaelmior/spark/commit/4b4076cfb1d51ceb20fd2b0a3b1b5be2aebb6416



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23820) Allow the long form of call sites to be recorded in the log

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23820:


Assignee: (was: Apache Spark)

> Allow the long form of call sites to be recorded in the log
> ---
>
> Key: SPARK-23820
> URL: https://issues.apache.org/jira/browse/SPARK-23820
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Michael Mior
>Priority: Major
>
> It would be nice if the long form of the callsite information could be 
> included in the log. An example of what I'm proposing is here: 
> https://github.com/michaelmior/spark/commit/4b4076cfb1d51ceb20fd2b0a3b1b5be2aebb6416



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23820) Allow the long form of call sites to be recorded in the log

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490905#comment-16490905
 ] 

Apache Spark commented on SPARK-23820:
--

User 'michaelmior' has created a pull request for this issue:
https://github.com/apache/spark/pull/21433

> Allow the long form of call sites to be recorded in the log
> ---
>
> Key: SPARK-23820
> URL: https://issues.apache.org/jira/browse/SPARK-23820
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Michael Mior
>Priority: Major
>
> It would be nice if the long form of the callsite information could be 
> included in the log. An example of what I'm proposing is here: 
> https://github.com/michaelmior/spark/commit/4b4076cfb1d51ceb20fd2b0a3b1b5be2aebb6416



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries

2018-05-25 Thread Marek Novotny (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marek Novotny updated SPARK-24331:
--
Description: 
Add SparkR equivalent to:
 * arrays_overlap - SPARK-23922
 * array_repeat - SPARK-23925
 * map_entries - SPARK-23935

  was:
Add SparkR equivalent to:
 * cardinality - SPARK-23923
 * arrays_overlap - SPARK-23922
 * array_repeat - SPARK-23925
 * map_entries - SPARK-23935


> Add arrays_overlap / array_repeat / map_entries  
> -
>
> Key: SPARK-24331
> URL: https://issues.apache.org/jira/browse/SPARK-24331
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Marek Novotny
>Priority: Major
>
> Add SparkR equivalent to:
>  * arrays_overlap - SPARK-23922
>  * array_repeat - SPARK-23925
>  * map_entries - SPARK-23935



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-24380) argument quoting/escaping broken in mesos cluster scheduler

2018-05-25 Thread paul mackles (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

paul mackles closed SPARK-24380.


> argument quoting/escaping broken in mesos cluster scheduler
> ---
>
> Key: SPARK-24380
> URL: https://issues.apache.org/jira/browse/SPARK-24380
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Mesos
>Affects Versions: 2.2.0, 2.3.0
>Reporter: paul mackles
>Priority: Critical
> Fix For: 2.4.0
>
>
> When a configuration property contains shell characters that require quoting, 
> the Mesos cluster scheduler generates the spark-submit argument like so:
> {code:java}
> --conf "spark.mesos.executor.docker.parameters="label=logging=|foo|""{code}
> Note the quotes around the property value as well as the key=value pair. When 
> using docker, this breaks the spark-submit command and causes the "|" to be 
> interpreted as an actual shell PIPE. Spaces, semi-colons, etc also cause 
> issues.
> Although I haven't tried, I suspect this is also a potential security issue 
> in that someone could exploit it to run arbitrary code on the host.
> My patch is pretty minimal and just removes the outer quotes around the 
> key=value pair, resulting in something like:
> {code:java}
> --conf spark.mesos.executor.docker.parameters="label=logging=|foo|"{code}
> A more extensive fix might try wrapping the entire key=value pair in single 
> quotes but I was concerned about backwards compatibility with that change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24380) argument quoting/escaping broken in mesos cluster scheduler

2018-05-25 Thread paul mackles (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

paul mackles resolved SPARK-24380.
--
Resolution: Duplicate

Dupe of SPARK-23941, just a different config

> argument quoting/escaping broken in mesos cluster scheduler
> ---
>
> Key: SPARK-24380
> URL: https://issues.apache.org/jira/browse/SPARK-24380
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, Mesos
>Affects Versions: 2.2.0, 2.3.0
>Reporter: paul mackles
>Priority: Critical
> Fix For: 2.4.0
>
>
> When a configuration property contains shell characters that require quoting, 
> the Mesos cluster scheduler generates the spark-submit argument like so:
> {code:java}
> --conf "spark.mesos.executor.docker.parameters="label=logging=|foo|""{code}
> Note the quotes around the property value as well as the key=value pair. When 
> using docker, this breaks the spark-submit command and causes the "|" to be 
> interpreted as an actual shell PIPE. Spaces, semi-colons, etc also cause 
> issues.
> Although I haven't tried, I suspect this is also a potential security issue 
> in that someone could exploit it to run arbitrary code on the host.
> My patch is pretty minimal and just removes the outer quotes around the 
> key=value pair, resulting in something like:
> {code:java}
> --conf spark.mesos.executor.docker.parameters="label=logging=|foo|"{code}
> A more extensive fix might try wrapping the entire key=value pair in single 
> quotes but I was concerned about backwards compatibility with that change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries

2018-05-25 Thread Marek Novotny (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marek Novotny updated SPARK-24331:
--
Summary: Add arrays_overlap / array_repeat / map_entries(was: Add 
cardinality / arrays_overlap / array_repeat / map_entries  )

> Add arrays_overlap / array_repeat / map_entries  
> -
>
> Key: SPARK-24331
> URL: https://issues.apache.org/jira/browse/SPARK-24331
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Marek Novotny
>Priority: Major
>
> Add SparkR equivalent to:
>  * cardinality - SPARK-23923
>  * arrays_overlap - SPARK-23922
>  * array_repeat - SPARK-23925
>  * map_entries - SPARK-23935



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24390) confusion of columns in projection after WITH ROLLUP

2018-05-25 Thread Ryan Foss (JIRA)
Ryan Foss created SPARK-24390:
-

 Summary: confusion of columns in projection after WITH ROLLUP
 Key: SPARK-24390
 URL: https://issues.apache.org/jira/browse/SPARK-24390
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
 Environment: Databricks runtime 4.0
Reporter: Ryan Foss


Using two CTEs, where the first issues a WITH ROLLUP and the second is a 
projection of the first, when attempting to join the two CTEs, spark seems to 
consider the key column in each CTE to be the same column, resulting in a 
"Cannot resolve column" error.

 
{noformat}
CREATE TABLE IF NOT EXISTS test_rollup (key varchar(3), code varchar(3), stuff 
int);

EXPLAIN

WITH
cte1 AS (
  SELECT
key,
code,
struct(code, avg(stuff)) AS stuff
  FROM test_rollup
  GROUP BY key, code WITH ROLLUP
),

cte2 AS (
  SELECT
key,
collect_list(stuff) AS stuff_details
  FROM cte1
  WHERE code IS NOT NULL
  GROUP BY key
)

-- join summary record from cte1 to cte2
SELECT c1.key, c1.stuff AS summary_stuff, c2.stuff_details AS detail_stuff
FROM cte1 c1
JOIN cte2 c2
  ON c2.key = c1.key
WHERE c1.code IS NULL


== Physical Plan == org.apache.spark.sql.AnalysisException: cannot resolve 
'`c2.key`' given input columns: [c1.key, c1.code, c1.stuff, c2.stuff_details]; 
line 22 pos 5; 'Project ['c1.key, 'c1.stuff AS summary_stuff#5415, 
'c2.stuff_details AS detail_stuff#5416] +- 'Filter isnull('c1.code) +- 'Join 
Inner, ('c2.key = 'c1.key) :- SubqueryAlias c1 : +- SubqueryAlias cte1 : +- 
Aggregate [key#5429, code#5430, spark_grouping_id#5426], [key#5429, code#5430, 
named_struct(code, code#5430, col2, avg(cast(stuff#5424 as bigint))) AS 
stuff#5417] : +- Expand [List(key#5422, code#5423, stuff#5424, key#5427, 
code#5428, 0), List(key#5422, code#5423, stuff#5424, key#5427, null, 1), 
List(key#5422, code#5423, stuff#5424, null, null, 3)], [key#5422, code#5423, 
stuff#5424, key#5429, code#5430, spark_grouping_id#5426] : +- Project 
[key#5422, code#5423, stuff#5424, key#5422 AS key#5427, code#5423 AS 
code#5428]{noformat}
Changing the cte2 query and adding a column alias "key AS key", will cause the 
columns to be considered unique, resolving the join issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24389) describe() can't work on column that name contain dots

2018-05-25 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490797#comment-16490797
 ] 

Marco Gaido commented on SPARK-24389:
-

I cannot reproduce on current master. Probably it has been fixed in SPARK-21100 
(ie. 2.3.0). I think we can close this.

> describe() can't work on column that name contain dots
> --
>
> Key: SPARK-24389
> URL: https://issues.apache.org/jira/browse/SPARK-24389
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: zhanggengxin
>Priority: Major
>
> {code:scala}
> val df = Seq((1, 1)).toDF("a_b", "a.c")
> df.describe() // won't work
> df.describe("`a.c`") // will work
> {code}
> Given that you can't use describe() on dataFrame that column contain dots



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24373:


Assignee: Apache Spark

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Assignee: Apache Spark
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24373:


Assignee: (was: Apache Spark)

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490772#comment-16490772
 ] 

Apache Spark commented on SPARK-24373:
--

User 'mgaido91' has created a pull request for this issue:
https://github.com/apache/spark/pull/21432

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19112) add codec for ZStandard

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490749#comment-16490749
 ] 

Apache Spark commented on SPARK-19112:
--

User 'wangyum' has created a pull request for this issue:
https://github.com/apache/spark/pull/21431

> add codec for ZStandard
> ---
>
> Key: SPARK-19112
> URL: https://issues.apache.org/jira/browse/SPARK-19112
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Thomas Graves
>Assignee: Sital Kedia
>Priority: Minor
> Fix For: 2.3.0
>
>
> ZStandard: https://github.com/facebook/zstd and 
> http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was 
> recently released. Hadoop 
> (https://issues.apache.org/jira/browse/HADOOP-13578) and others 
> (https://issues.apache.org/jira/browse/KAFKA-4514) are adopting it.
> Zstd seems to give great results => Gzip level Compression with Lz4 level CPU.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23991) data loss when allocateBlocksToBatch

2018-05-25 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490729#comment-16490729
 ] 

Apache Spark commented on SPARK-23991:
--

User 'gaborgsomogyi' has created a pull request for this issue:
https://github.com/apache/spark/pull/21430

> data loss when allocateBlocksToBatch
> 
>
> Key: SPARK-23991
> URL: https://issues.apache.org/jira/browse/SPARK-23991
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Input/Output
>Affects Versions: 2.2.0
> Environment: spark 2.11
>Reporter: kevin fu
>Priority: Major
>
> with checkpoint and WAL enabled, driver will write the allocation of blocks 
> to batch into hdfs. however, if it fails as following, the blocks of this 
> batch cannot be computed by the DAG. Because the blocks have been dequeued 
> from the receivedBlockQueue and get lost.
> {panel:title=error log}
> 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
> record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
> Exception thrown in awaitResult: at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
>  at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused 
> by: java.util.concurrent.TimeoutException: Futures timed out after [5000 
> milliseconds] at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>  at scala.concurrent.Await$.result(package.scala:190) at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
> more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 
> 152376548 ms needs to be processed again in WAL recovery{panel}
> the concerning codes are showed below:
> {code}
>   /**
>* Allocate all unallocated blocks to the given batch.
>* This event will get written to the write ahead log (if enabled).
>*/
>   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
> if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) 
> {
>   val streamIdToBlocks = streamIds.map { streamId =>
>   (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
>   }.toMap
>   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
>   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
> timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
> lastAllocatedBatchTime = batchTime
>   } else {
> logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
>   }
> } else {
>   // This situation occurs when:
>   // 1. WAL is ended with BatchAllocationEvent, but without 
> BatchCleanupEvent,
>   // possibly processed batch job or half-processed batch job need to be 
> processed again,
>   // so the batchTime will be equal to lastAllocatedBatchTime.
>   // 2. Slow checkpointing makes recovered batch time older than WAL 
> recovered
>   // lastAllocatedBatchTime.
>   // This situation will only occurs in recovery time.
>   logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For 

[jira] [Assigned] (SPARK-23991) data loss when allocateBlocksToBatch

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23991:


Assignee: (was: Apache Spark)

> data loss when allocateBlocksToBatch
> 
>
> Key: SPARK-23991
> URL: https://issues.apache.org/jira/browse/SPARK-23991
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Input/Output
>Affects Versions: 2.2.0
> Environment: spark 2.11
>Reporter: kevin fu
>Priority: Major
>
> with checkpoint and WAL enabled, driver will write the allocation of blocks 
> to batch into hdfs. however, if it fails as following, the blocks of this 
> batch cannot be computed by the DAG. Because the blocks have been dequeued 
> from the receivedBlockQueue and get lost.
> {panel:title=error log}
> 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
> record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
> Exception thrown in awaitResult: at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
>  at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused 
> by: java.util.concurrent.TimeoutException: Futures timed out after [5000 
> milliseconds] at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>  at scala.concurrent.Await$.result(package.scala:190) at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
> more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 
> 152376548 ms needs to be processed again in WAL recovery{panel}
> the concerning codes are showed below:
> {code}
>   /**
>* Allocate all unallocated blocks to the given batch.
>* This event will get written to the write ahead log (if enabled).
>*/
>   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
> if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) 
> {
>   val streamIdToBlocks = streamIds.map { streamId =>
>   (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
>   }.toMap
>   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
>   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
> timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
> lastAllocatedBatchTime = batchTime
>   } else {
> logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
>   }
> } else {
>   // This situation occurs when:
>   // 1. WAL is ended with BatchAllocationEvent, but without 
> BatchCleanupEvent,
>   // possibly processed batch job or half-processed batch job need to be 
> processed again,
>   // so the batchTime will be equal to lastAllocatedBatchTime.
>   // 2. Slow checkpointing makes recovered batch time older than WAL 
> recovered
>   // lastAllocatedBatchTime.
>   // This situation will only occurs in recovery time.
>   logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23991) data loss when allocateBlocksToBatch

2018-05-25 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23991:


Assignee: Apache Spark

> data loss when allocateBlocksToBatch
> 
>
> Key: SPARK-23991
> URL: https://issues.apache.org/jira/browse/SPARK-23991
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Input/Output
>Affects Versions: 2.2.0
> Environment: spark 2.11
>Reporter: kevin fu
>Assignee: Apache Spark
>Priority: Major
>
> with checkpoint and WAL enabled, driver will write the allocation of blocks 
> to batch into hdfs. however, if it fails as following, the blocks of this 
> batch cannot be computed by the DAG. Because the blocks have been dequeued 
> from the receivedBlockQueue and get lost.
> {panel:title=error log}
> 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing 
> record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> 
> ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: 
> Exception thrown in awaitResult: at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
>  at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118)
>  at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused 
> by: java.util.concurrent.TimeoutException: Futures timed out after [5000 
> milliseconds] at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at 
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>  at scala.concurrent.Await$.result(package.scala:190) at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 
> more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 
> 152376548 ms needs to be processed again in WAL recovery{panel}
> the concerning codes are showed below:
> {code}
>   /**
>* Allocate all unallocated blocks to the given batch.
>* This event will get written to the write ahead log (if enabled).
>*/
>   def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
> if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) 
> {
>   val streamIdToBlocks = streamIds.map { streamId =>
>   (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
>   }.toMap
>   val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
>   if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
> timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
> lastAllocatedBatchTime = batchTime
>   } else {
> logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
>   }
> } else {
>   // This situation occurs when:
>   // 1. WAL is ended with BatchAllocationEvent, but without 
> BatchCleanupEvent,
>   // possibly processed batch job or half-processed batch job need to be 
> processed again,
>   // so the batchTime will be equal to lastAllocatedBatchTime.
>   // 2. Slow checkpointing makes recovered batch time older than WAL 
> recovered
>   // lastAllocatedBatchTime.
>   // This situation will only occurs in recovery time.
>   logInfo(s"Possibly processed batch $batchTime needs to be processed 
> again in WAL recovery")
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data

2018-05-25 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490551#comment-16490551
 ] 

Marco Gaido commented on SPARK-24373:
-

[~wbzhao] yes, I do agree with you. That is the problem.

> "df.cache() df.count()" no longer eagerly caches data
> -
>
> Key: SPARK-24373
> URL: https://issues.apache.org/jira/browse/SPARK-24373
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Wenbo Zhao
>Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println(""); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24389) describe() can't work on column that name contain dots

2018-05-25 Thread zhanggengxin (JIRA)
zhanggengxin created SPARK-24389:


 Summary: describe() can't work on column that name contain dots
 Key: SPARK-24389
 URL: https://issues.apache.org/jira/browse/SPARK-24389
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: zhanggengxin


{code:scala}
val df = Seq((1, 1)).toDF("a_b", "a.c")
df.describe() // won't work
df.describe("`a.c`") // will work
{code}

Given that you can't use describe() on dataFrame that column contain dots



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24271) sc.hadoopConfigurations can not be overwritten in the same spark context

2018-05-25 Thread Jami Malikzade (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490499#comment-16490499
 ] 

Jami Malikzade commented on SPARK-24271:


[~ste...@apache.org] Thank you

> sc.hadoopConfigurations can not be overwritten in the same spark context
> 
>
> Key: SPARK-24271
> URL: https://issues.apache.org/jira/browse/SPARK-24271
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 2.3.0
>Reporter: Jami Malikzade
>Priority: Major
>
> If for example we pass to spark context  following configs :
> sc.hadoopConfiguration.set("fs.s3a.access.key", "correctAK") 
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "correctSK") 
> sc.hadoopConfiguration.set("fs.s3a.endpoint", "objectstorage:8773") //
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
> We are able later read from bucket. So behavior is expected.
> If in the same sc I will change credentials to wrong, and will try to read 
> from bucket it will still work,
> and vice versa if it were wrong credentials,changing to working will not work.
> sc.hadoopConfiguration.set("fs.s3a.access.key", "wrongAK") // 
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "wrongSK") //



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24271) sc.hadoopConfigurations can not be overwritten in the same spark context

2018-05-25 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490474#comment-16490474
 ] 

Steve Loughran commented on SPARK-24271:


Disabling the s3 cache can be pretty inefficient, as every worker talking to a 
bucket is going to create a new instance, with its own AWS thread pool & things.

if you are trying to change perms for different buckets, you can use 
[per-bucket configuration 
instead|https://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/index.html#Configuring_different_S3_buckets_with_Per-Bucket_Configuration]
{code}

fs.s3a.bucket.myfirstbucket.access.key=A
fs.s3a.bucket.myfirstbucket.secret.key=

fs.s3a.bucket.backups.access.key=B
fs.s3a.bucket.backups.secret.key=Y
{code}
Same for things like endpoint.

these can all coexist in the same configuration file, where I'd recommend a 
spark-default.conf rather than code, as with code it's all to easy to 
accidentally commit your secrets somewhere public like github.

> sc.hadoopConfigurations can not be overwritten in the same spark context
> 
>
> Key: SPARK-24271
> URL: https://issues.apache.org/jira/browse/SPARK-24271
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 2.3.0
>Reporter: Jami Malikzade
>Priority: Major
>
> If for example we pass to spark context  following configs :
> sc.hadoopConfiguration.set("fs.s3a.access.key", "correctAK") 
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "correctSK") 
> sc.hadoopConfiguration.set("fs.s3a.endpoint", "objectstorage:8773") //
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
> We are able later read from bucket. So behavior is expected.
> If in the same sc I will change credentials to wrong, and will try to read 
> from bucket it will still work,
> and vice versa if it were wrong credentials,changing to working will not work.
> sc.hadoopConfiguration.set("fs.s3a.access.key", "wrongAK") // 
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "wrongSK") //



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-17592) SQL: CAST string as INT inconsistent with Hive

2018-05-25 Thread Jorge Machado (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Machado updated SPARK-17592:
--
Comment: was deleted

(was: I'm hitting the same issue I'm afraid but in slightly another way. When I 
have a dataframe (that comes from oracle DB ) as parquet I can see in the logs 
that a field is beeing saved as integer : 

 

{ "type" : "struct", "fields" : [ \{ "name" : "project_id", "type" : "integer", 
"nullable" : true, "metadata" : { } },... 

 

on hue (which reads from hive) I see : 

!image-2018-05-24-17-10-24-515.png!)

> SQL: CAST string as INT inconsistent with Hive
> --
>
> Key: SPARK-17592
> URL: https://issues.apache.org/jira/browse/SPARK-17592
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Furcy Pin
>Priority: Major
> Attachments: image-2018-05-24-17-10-24-515.png
>
>
> Hello,
> there seem to be an inconsistency between Spark and Hive when casting a 
> string into an Int. 
> With Hive:
> {code}
> select cast("0.4" as INT) ;
> > 0
> select cast("0.5" as INT) ;
> > 0
> select cast("0.6" as INT) ;
> > 0
> {code}
> With Spark-SQL:
> {code}
> select cast("0.4" as INT) ;
> > 0
> select cast("0.5" as INT) ;
> > 1
> select cast("0.6" as INT) ;
> > 1
> {code}
> Hive seems to perform a floor(string.toDouble), while Spark seems to perform 
> a round(string.toDouble)
> I'm not sure there is any ISO standard for this, mysql has the same behavior 
> than Hive, while postgresql performs a string.toInt and throws an 
> NumberFormatException
> Personnally I think Hive is right, hence my posting this here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24388) EventLoop's run method don't handle fatal error, causes driver hang forever

2018-05-25 Thread Xianjin YE (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490373#comment-16490373
 ] 

Xianjin YE commented on SPARK-24388:


I am working on this and will send a pr soon.

> EventLoop's run method don't handle fatal error, causes driver hang forever
> ---
>
> Key: SPARK-24388
> URL: https://issues.apache.org/jira/browse/SPARK-24388
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
>Reporter: Xianjin YE
>Priority: Major
>
> Once a fatal error(such as NoSuchMethodError) happens during 
> `onReceive(event)`, the eventThread thread will exist. However the eventQueue 
> is still accepting events. The whole spark application will hang forever.
>  
> cc [~zsxwing] [~XuanYuan]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24388) EventLoop's run method don't handle fatal error, causes driver hang forever

2018-05-25 Thread Xianjin YE (JIRA)
Xianjin YE created SPARK-24388:
--

 Summary: EventLoop's run method don't handle fatal error, causes 
driver hang forever
 Key: SPARK-24388
 URL: https://issues.apache.org/jira/browse/SPARK-24388
 Project: Spark
  Issue Type: Bug
  Components: Scheduler, Spark Core
Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.1.2, 2.1.1, 2.1.0
Reporter: Xianjin YE


Once a fatal error(such as NoSuchMethodError) happens during 
`onReceive(event)`, the eventThread thread will exist. However the eventQueue 
is still accepting events. The whole spark application will hang forever.

 

cc [~zsxwing] [~XuanYuan]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-25 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490309#comment-16490309
 ] 

Rui Li commented on SPARK-24387:


When HeartbeatReceiver finds the executor's heartbeat is timeout, it informs 
the TaskScheduler and kills the executor asynchronously. When TaskScheduler 
handles the lost executor, it tries to revive offer from the backend. So I 
think there's a race condition that the backend may make offers before killing 
the executor. And since this is the only executor left, it's offered to the 
TaskScheduler and the retried task is scheduled to it.

And when killing a heartbeat-timeout executor, we expect a replacement executor 
to be launched. But when the new executor is launched, there's no task for it 
to run. So it's kept idle until killed by dynamic allocation.

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24374) SPIP: Support Barrier Scheduling in Apache Spark

2018-05-25 Thread Wei Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490307#comment-16490307
 ] 

Wei Yan commented on SPARK-24374:
-

Thanks [~mengxr] for the initiative and the doc. cc [~leftnoteasy] [~zhz] , as 
may need some support from YARN side if running as yarn-cluster.

> SPIP: Support Barrier Scheduling in Apache Spark
> 
>
> Key: SPARK-24374
> URL: https://issues.apache.org/jira/browse/SPARK-24374
> Project: Spark
>  Issue Type: Epic
>  Components: ML, Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: SPIP
> Attachments: SPIP_ Support Barrier Scheduling in Apache Spark.pdf
>
>
> (See details in the linked/attached SPIP doc.)
> {quote}
> The proposal here is to add a new scheduling model to Apache Spark so users 
> can properly embed distributed DL training as a Spark stage to simplify the 
> distributed training workflow. For example, Horovod uses MPI to implement 
> all-reduce to accelerate distributed TensorFlow training. The computation 
> model is different from MapReduce used by Spark. In Spark, a task in a stage 
> doesn’t depend on any other tasks in the same stage, and hence it can be 
> scheduled independently. In MPI, all workers start at the same time and pass 
> messages around. To embed this workload in Spark, we need to introduce a new 
> scheduling model, tentatively named “barrier scheduling”, which launches 
> tasks at the same time and provides users enough information and tooling to 
> embed distributed DL training. Spark can also provide an extra layer of fault 
> tolerance in case some tasks failed in the middle, where Spark would abort 
> all tasks and restart the stage.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-25 Thread Rui Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490293#comment-16490293
 ] 

Rui Li commented on SPARK-24387:


A snippet of the log w/ some fields masked:
{noformat}
[Stage 2:==>(199 + 1) / 
200]18/05/20 05:37:07 WARN HeartbeatReceiver: Removing executor 1100 with no 
recent heartbeats: 345110 ms exceeds timeout 30 ms
18/05/20 05:37:07 ERROR YarnClusterScheduler: Lost executor 1100 on HOSTA: 
Executor heartbeat timed out after 345110 ms
18/05/20 05:37:07 WARN TaskSetManager: Lost task 55.0 in stage 2.0 (TID 12080, 
HOSTA, executor 1100): ExecutorLostFailure (executor 1100 exited caused by one 
of the running tasks) Reason: Executor heartbeat timed out after 345110 ms
18/05/20 05:37:07 INFO DAGScheduler: Executor lost: 1100 (epoch 2)
18/05/20 05:37:07 INFO DAGScheduler: Host added was in lost list earlier: HOSTA
18/05/20 05:37:07 INFO TaskSetManager: Starting task 55.1 in stage 2.0 (TID 
12225, HOSTA, executor 1100, partition 55, PROCESS_LOCAL, 6227 bytes)
{noformat}

> Heartbeat-timeout executor is added back and used again
> ---
>
> Key: SPARK-24387
> URL: https://issues.apache.org/jira/browse/SPARK-24387
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Rui Li
>Priority: Major
>
> In our job, when there's only one task and one executor running, the 
> executor's heartbeat is lost and driver decides to remove it. However, the 
> executor is added again and the task's retry attempt is scheduled to that 
> executor, almost immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24387) Heartbeat-timeout executor is added back and used again

2018-05-25 Thread Rui Li (JIRA)
Rui Li created SPARK-24387:
--

 Summary: Heartbeat-timeout executor is added back and used again
 Key: SPARK-24387
 URL: https://issues.apache.org/jira/browse/SPARK-24387
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: Rui Li


In our job, when there's only one task and one executor running, the executor's 
heartbeat is lost and driver decides to remove it. However, the executor is 
added again and the task's retry attempt is scheduled to that executor, almost 
immediately after the executor is marked as lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org