[jira] [Assigned] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22347:


Assignee: (was: Apache Spark)

> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22347:


Assignee: Apache Spark

> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Assignee: Apache Spark
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221719#comment-16221719
 ] 

Apache Spark commented on SPARK-22347:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/19584

> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 5:06 AM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x))
df2.show()
+-+
|fn(x)|
+-+
|2|
| null|
+-+
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 5:05 AM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def divideByValue():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(divideByValue()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18755) Add Randomized Grid Search to Spark ML

2017-10-26 Thread Ilya Matiach (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221687#comment-16221687
 ] 

Ilya Matiach commented on SPARK-18755:
--

I've created a PR that adds randomized grid search to mmlspark package here:

https://github.com/Azure/mmlspark/pull/168



> Add Randomized Grid Search to Spark ML
> --
>
> Key: SPARK-18755
> URL: https://issues.apache.org/jira/browse/SPARK-18755
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: yuhao yang
>
> Randomized Grid Search  implements a randomized search over parameters, where 
> each setting is sampled from a distribution over possible parameter values. 
> This has two main benefits over an exhaustive search:
> 1. A budget can be chosen independent of the number of parameters and 
> possible values.
> 2. Adding parameters that do not influence the performance does not decrease 
> efficiency.
> Randomized Grid search usually gives similar result as exhaustive search, 
> while the run time for randomized search is drastically lower.
> For more background, please refer to:
> sklearn: http://scikit-learn.org/stable/modules/grid_search.html
> http://blog.kaggle.com/2015/07/16/scikit-learn-video-8-efficiently-searching-for-optimal-tuning-parameters/
> http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf
> https://www.r-bloggers.com/hyperparameter-optimization-in-h2o-grid-search-random-search-and-the-future/.
> There're two ways to implement this in Spark as I see:
> 1. Add searchRatio to ParamGridBuilder and conduct sampling directly during 
> build. Only 1 new public function is required.
> 2. Add trait RadomizedSearch and create new class RandomizedCrossValidator 
> and RandomizedTrainValiationSplit, which can be complicated since we need to 
> deal with the models.
> I'd prefer option 1 as it's much simpler and straightforward. We can support 
> Randomized grid search via some smallest change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612
 ] 

Liang-Chi Hsieh edited comment on SPARK-22347 at 10/27/17 2:44 AM:
---

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
like to disable the usage of Python UDFs in CaseWhen. I think it can be very 
easy to incorporate the condition logic of CaseWhen into the Python UDFs, e.g. 
for the above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}



was (Author: viirya):
Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
disable the usage of Python UDFs in CaseWhen. I think it can be very easy to 
incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the 
above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2017-10-26 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221612#comment-16221612
 ] 

Liang-Chi Hsieh commented on SPARK-22347:
-

Under the current execution mode of Python UDFs, I think it is hard to support 
Python UDFs as branch values or else value in CaseWhen expression. The 
execution of batch/vectorized Python UDFs evaluates the UDFs in an operator at 
once. It might not be easy to let it support conditional execution. I'd rather 
disable the usage of Python UDFs in CaseWhen. I think it can be very easy to 
incorporate the condition logic of CaseWhen into the Python UDFs, e.g. for the 
above example:

{code}
def Divide10():
def fn(value): return 10 / int(value) if (value > 0) else None
return udf(fn, types.IntegerType())

df2 = df.select(when((x > 0), Divide10()(x)))
df2.show()
++
|CASE WHEN (x > 0) THEN fn(x) END|
++
|   2|
|null|
++
{code}


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22355) Dataset.collect is not threadsafe

2017-10-26 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-22355.
-
   Resolution: Fixed
Fix Version/s: 2.3.0
   2.2.1

> Dataset.collect is not threadsafe
> -
>
> Key: SPARK-22355
> URL: https://issues.apache.org/jira/browse/SPARK-22355
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
> Fix For: 2.2.1, 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22356) data source table should support overlapped columns between data and partition schema

2017-10-26 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-22356.
-
   Resolution: Fixed
Fix Version/s: 2.3.0
   2.2.1

> data source table should support overlapped columns between data and 
> partition schema
> -
>
> Key: SPARK-22356
> URL: https://issues.apache.org/jira/browse/SPARK-22356
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
> Fix For: 2.2.1, 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu resolved SPARK-22366.
--
   Resolution: Fixed
 Assignee: Jose Torres
Fix Version/s: 2.3.0

> Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
> --
>
> Key: SPARK-22366
> URL: https://issues.apache.org/jira/browse/SPARK-22366
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Minor
> Fix For: 2.3.0
>
>
> There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will 
> quietly ignore attempted reads from files that have been corrupted, but it 
> still allows the query to fail on missing files. Being able to ignore missing 
> files too is useful in some replication scenarios.
> We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp

2017-10-26 Thread Hossein Falaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221460#comment-16221460
 ] 

Hossein Falaki commented on SPARK-22344:


I don't have solid pointer as to why we are creating these temp directories for 
hive. I think it would be nicer to fix them in Spark. It is a good practice not 
to leave files on /tmp.

> Prevent R CMD check from using /tmp
> ---
>
> Key: SPARK-22344
> URL: https://issues.apache.org/jira/browse/SPARK-22344
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0
>Reporter: Shivaram Venkataraman
>
> When R CMD check is run on the SparkR package it leaves behind files in /tmp 
> which is a violation of CRAN policy. We should instead write to Rtmpdir. 
> Notes from CRAN are below
> {code}
> Checking this leaves behind dirs
>hive/$USER
>$USER
> and files named like
>b4f6459b-0624-4100-8358-7aa7afbda757_resources
> in /tmp, in violation of the CRAN Policy.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22305) HDFSBackedStateStoreProvider fails with StackOverflowException when attempting to recover state

2017-10-26 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221432#comment-16221432
 ] 

Shixiong Zhu commented on SPARK-22305:
--

[~Yuval.Itzchakov] how many batches per 1 minute in your query? If there are a 
lot of batches, you can try to run your application with `--conf 
spark.sql.streaming.stateStore.maintenanceInterval=10s` to set a small interval 
as a workaround.

However, we definitely should fix this by rewriting these codes in a 
non-recursive way. 


> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> ---
>
> Key: SPARK-22305
> URL: https://issues.apache.org/jira/browse/SPARK-22305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>   at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>   at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>   at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>   at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>   at org.apache.hadoop.hdfs.DFSInputStream.(DFSInputStream.java:261)
>   at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>   at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>   at 
> 

[jira] [Assigned] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22339:


Assignee: (was: Apache Spark)

> Push epoch updates to executors on fetch failure to avoid fetch retries for 
> missing executors
> -
>
> Key: SPARK-22339
> URL: https://issues.apache.org/jira/browse/SPARK-22339
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
> Attachments: push_epoch_update-WIP.diff
>
>
> When a task finishes with error due to a fetch error, then DAGScheduler 
> unregisters the shuffle blocks hosted by the serving executor (or even all 
> the executors in the failing host, with external shuffle and 
> spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block 
> directory stored by MapOutputTracker, that then increments its epoch as a 
> result. This event is only signaled to the other executors when a new task 
> with a new epoch starts in each executor. This means that other executors 
> reading from the failed executors will retry fetching shuffle blocks from 
> them, even though the driver already knows those executors are lost and those 
> blocks are now unavailable at those locations. This impacts job runtime, 
> specially for long shuffles and executor failures at the end of a stage, when 
> the only pending tasks are shuffle reads. 
> This could be improved by pushing the epoch update to the executors without 
> having to wait for a new task. In the attached patch I sketch a possible 
> solution that sends the updated epoch from the driver to the executors by 
> piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, 
> RetryingBlockFetcher and BlockFetchingListener are modified so blocks 
> locations are checked on each fetch retry. This doesn't introduce additional 
> traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running 
> on the same Executor, and the lookup of the new shuffle blocks directory was 
> going to happen anyway when the new epoch is detected during the start of the 
> next task. 
> I would like to know the opinion of the community on this approach. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors

2017-10-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221423#comment-16221423
 ] 

Apache Spark commented on SPARK-22339:
--

User 'juanrh' has created a pull request for this issue:
https://github.com/apache/spark/pull/19583

> Push epoch updates to executors on fetch failure to avoid fetch retries for 
> missing executors
> -
>
> Key: SPARK-22339
> URL: https://issues.apache.org/jira/browse/SPARK-22339
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
> Attachments: push_epoch_update-WIP.diff
>
>
> When a task finishes with error due to a fetch error, then DAGScheduler 
> unregisters the shuffle blocks hosted by the serving executor (or even all 
> the executors in the failing host, with external shuffle and 
> spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block 
> directory stored by MapOutputTracker, that then increments its epoch as a 
> result. This event is only signaled to the other executors when a new task 
> with a new epoch starts in each executor. This means that other executors 
> reading from the failed executors will retry fetching shuffle blocks from 
> them, even though the driver already knows those executors are lost and those 
> blocks are now unavailable at those locations. This impacts job runtime, 
> specially for long shuffles and executor failures at the end of a stage, when 
> the only pending tasks are shuffle reads. 
> This could be improved by pushing the epoch update to the executors without 
> having to wait for a new task. In the attached patch I sketch a possible 
> solution that sends the updated epoch from the driver to the executors by 
> piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, 
> RetryingBlockFetcher and BlockFetchingListener are modified so blocks 
> locations are checked on each fetch retry. This doesn't introduce additional 
> traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running 
> on the same Executor, and the lookup of the new shuffle blocks directory was 
> going to happen anyway when the new epoch is detected during the start of the 
> next task. 
> I would like to know the opinion of the community on this approach. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22339:


Assignee: Apache Spark

> Push epoch updates to executors on fetch failure to avoid fetch retries for 
> missing executors
> -
>
> Key: SPARK-22339
> URL: https://issues.apache.org/jira/browse/SPARK-22339
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 2.2.0
>Reporter: Juan Rodríguez Hortalá
>Assignee: Apache Spark
> Attachments: push_epoch_update-WIP.diff
>
>
> When a task finishes with error due to a fetch error, then DAGScheduler 
> unregisters the shuffle blocks hosted by the serving executor (or even all 
> the executors in the failing host, with external shuffle and 
> spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block 
> directory stored by MapOutputTracker, that then increments its epoch as a 
> result. This event is only signaled to the other executors when a new task 
> with a new epoch starts in each executor. This means that other executors 
> reading from the failed executors will retry fetching shuffle blocks from 
> them, even though the driver already knows those executors are lost and those 
> blocks are now unavailable at those locations. This impacts job runtime, 
> specially for long shuffles and executor failures at the end of a stage, when 
> the only pending tasks are shuffle reads. 
> This could be improved by pushing the epoch update to the executors without 
> having to wait for a new task. In the attached patch I sketch a possible 
> solution that sends the updated epoch from the driver to the executors by 
> piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, 
> RetryingBlockFetcher and BlockFetchingListener are modified so blocks 
> locations are checked on each fetch retry. This doesn't introduce additional 
> traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running 
> on the same Executor, and the lookup of the new shuffle blocks directory was 
> going to happen anyway when the new epoch is detected during the start of the 
> next task. 
> I would like to know the opinion of the community on this approach. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22131) Add Mesos Secrets Support to the Mesos Driver

2017-10-26 Thread Marcelo Vanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-22131.

   Resolution: Fixed
 Assignee: Susan X. Huynh
Fix Version/s: 2.3.0

> Add Mesos Secrets Support to the Mesos Driver
> -
>
> Key: SPARK-22131
> URL: https://issues.apache.org/jira/browse/SPARK-22131
> Project: Spark
>  Issue Type: New Feature
>  Components: Mesos
>Affects Versions: 2.3.0
>Reporter: Arthur Rand
>Assignee: Susan X. Huynh
> Fix For: 2.3.0
>
>
> We recently added Secrets support to the Dispatcher (SPARK-20812). In order 
> to have Driver-to-Executor TLS we need the same support in the Mesos Driver 
> so a secret can be disseminated to the executors. This JIRA is to move the 
> current secrets implementation to be used by both frameworks. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Ilya Matiach (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221388#comment-16221388
 ] 

Ilya Matiach commented on SPARK-22357:
--

binaryFiles ignores the number of partitions I want to have, even if I specify 
the value.  I have to repartition the returned DataFrame.  In my specific case, 
the number of partitions was very small, which caused performance issues.  I 
needed to increase the number of partitions by repartitioning the DataFrame 
after it was constructed, but this can be expensive - it would be better to 
create the DataFrame with the user-specified number of partitions.

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = 
> sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, 
> Math.max(openCostInBytes, bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
> 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20644) Hook up Spark UI to the new key-value store backend

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-20644:


Assignee: Apache Spark

> Hook up Spark UI to the new key-value store backend
> ---
>
> Key: SPARK-20644
> URL: https://issues.apache.org/jira/browse/SPARK-20644
> Project: Spark
>  Issue Type: Sub-task
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>Assignee: Apache Spark
>
> See spec in parent issue (SPARK-18085) for more details.
> This task tracks hooking up the Spark UI (both live and SHS) to the key-value 
> store based backend. It's the initial work to allow individual UI pages to be 
> de-coupled from the listener implementations and use the REST API data saved 
> in the store.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20644) Hook up Spark UI to the new key-value store backend

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-20644:


Assignee: (was: Apache Spark)

> Hook up Spark UI to the new key-value store backend
> ---
>
> Key: SPARK-20644
> URL: https://issues.apache.org/jira/browse/SPARK-20644
> Project: Spark
>  Issue Type: Sub-task
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>
> See spec in parent issue (SPARK-18085) for more details.
> This task tracks hooking up the Spark UI (both live and SHS) to the key-value 
> store based backend. It's the initial work to allow individual UI pages to be 
> de-coupled from the listener implementations and use the REST API data saved 
> in the store.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20644) Hook up Spark UI to the new key-value store backend

2017-10-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221372#comment-16221372
 ] 

Apache Spark commented on SPARK-20644:
--

User 'vanzin' has created a pull request for this issue:
https://github.com/apache/spark/pull/19582

> Hook up Spark UI to the new key-value store backend
> ---
>
> Key: SPARK-20644
> URL: https://issues.apache.org/jira/browse/SPARK-20644
> Project: Spark
>  Issue Type: Sub-task
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>
> See spec in parent issue (SPARK-18085) for more details.
> This task tracks hooking up the Spark UI (both live and SHS) to the key-value 
> store based backend. It's the initial work to allow individual UI pages to be 
> de-coupled from the listener implementations and use the REST API data saved 
> in the store.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-22366:
-
Component/s: (was: Spark Core)
 SQL

> Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
> --
>
> Key: SPARK-22366
> URL: https://issues.apache.org/jira/browse/SPARK-22366
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
>
> There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will 
> quietly ignore attempted reads from files that have been corrupted, but it 
> still allows the query to fail on missing files. Being able to ignore missing 
> files too is useful in some replication scenarios.
> We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Shixiong Zhu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-22366:
-
Issue Type: Improvement  (was: Bug)

> Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
> --
>
> Key: SPARK-22366
> URL: https://issues.apache.org/jira/browse/SPARK-22366
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
>
> There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will 
> quietly ignore attempted reads from files that have been corrupted, but it 
> still allows the query to fail on missing files. Being able to ignore missing 
> files too is useful in some replication scenarios.
> We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221225#comment-16221225
 ] 

Apache Spark commented on SPARK-22366:
--

User 'joseph-torres' has created a pull request for this issue:
https://github.com/apache/spark/pull/19581

> Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
> --
>
> Key: SPARK-22366
> URL: https://issues.apache.org/jira/browse/SPARK-22366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
>
> There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will 
> quietly ignore attempted reads from files that have been corrupted, but it 
> still allows the query to fail on missing files. Being able to ignore missing 
> files too is useful in some replication scenarios.
> We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22366:


Assignee: Apache Spark

> Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
> --
>
> Key: SPARK-22366
> URL: https://issues.apache.org/jira/browse/SPARK-22366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Assignee: Apache Spark
>Priority: Minor
>
> There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will 
> quietly ignore attempted reads from files that have been corrupted, but it 
> still allows the query to fail on missing files. Being able to ignore missing 
> files too is useful in some replication scenarios.
> We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-22366:


Assignee: (was: Apache Spark)

> Support ignoreMissingFiles flag parallel to ignoreCorruptFiles
> --
>
> Key: SPARK-22366
> URL: https://issues.apache.org/jira/browse/SPARK-22366
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Jose Torres
>Priority: Minor
>
> There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will 
> quietly ignore attempted reads from files that have been corrupted, but it 
> still allows the query to fail on missing files. Being able to ignore missing 
> files too is useful in some replication scenarios.
> We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22366) Support ignoreMissingFiles flag parallel to ignoreCorruptFiles

2017-10-26 Thread Jose Torres (JIRA)
Jose Torres created SPARK-22366:
---

 Summary: Support ignoreMissingFiles flag parallel to 
ignoreCorruptFiles
 Key: SPARK-22366
 URL: https://issues.apache.org/jira/browse/SPARK-22366
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Jose Torres
Priority: Minor


There's an existing flag "spark.sql.files.ignoreCorruptFiles" that will quietly 
ignore attempted reads from files that have been corrupted, but it still allows 
the query to fail on missing files. Being able to ignore missing files too is 
useful in some replication scenarios.

We should add a "spark.sql.files.ignoreMissingFiles" to fill out the 
functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22328) ClosureCleaner misses referenced superclass fields, gives them null values

2017-10-26 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-22328.
-
   Resolution: Fixed
Fix Version/s: 2.3.0
   2.2.1

Issue resolved by pull request 19556
[https://github.com/apache/spark/pull/19556]

> ClosureCleaner misses referenced superclass fields, gives them null values
> --
>
> Key: SPARK-22328
> URL: https://issues.apache.org/jira/browse/SPARK-22328
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Ryan Williams
> Fix For: 2.2.1, 2.3.0
>
>
> [Runnable repro 
> here|https://github.com/ryan-williams/spark-bugs/tree/closure]:
> Superclass with some fields:
> {code}
> abstract class App extends Serializable {
>   // SparkContext stub
>   @transient lazy val sc = new SparkContext(new 
> SparkConf().setAppName("test").setMaster("local[4]").set("spark.ui.showConsoleProgress",
>  "false"))
>   // These fields get missed by the ClosureCleaner in some situations
>   val n1 = 111
>   val s1 = "aaa"
>   // Simple scaffolding to exercise passing a closure to RDD.foreach in 
> subclasses
>   def rdd = sc.parallelize(1 to 1)
>   def run(name: String): Unit = {
> print(s"$name:\t")
> body()
> sc.stop()
>   }
>   def body(): Unit
> }
> {code}
> Running a simple Spark job with various instantiations of this class:
> {code}
> object Main {
>   /** [[App]]s generated this way will not correctly detect references to 
> [[App.n1]] in Spark closures */
>   val fn = () ⇒ new App {
> val n2 = 222
> val s2 = "bbb"
> def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") }
>   }
>   /** Doesn't serialize closures correctly */
>   val app1 = fn()
>   /** Works fine */
>   val app2 =
> new App {
>   val n2 = 222
>   val s2 = "bbb"
>   def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") }
> }
>   /** [[App]]s created this way also work fine */
>   def makeApp(): App =
> new App {
>   val n2 = 222
>   val s2 = "bbb"
>   def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") }
> }
>   val app3 = makeApp()  // ok
>   val fn2 = () ⇒ makeApp()  // ok
>   def main(args: Array[String]): Unit = {
> fn().run("fn")// bad: n1 → 0, s1 → null
> app1.run("app1")  // bad: n1 → 0, s1 → null
> app2.run("app2")  // ok
> app3.run("app3")  // ok
> fn2().run("fn2")  // ok
>   }
> }
> {code}
> Build + Run:
> {code}
> $ sbt run
> …
> fn:   0, 222, null, bbb
> app1: 0, 222, null, bbb
> app2: 111, 222, aaa, bbb
> app3: 111, 222, aaa, bbb
> fn2:  111, 222, aaa, bbb
> {code}
> The first two versions have {{0}} and {{null}}, resp., for the {{A.n1}} and 
> {{A.s1}} fields.
> Something about this syntax causes the problem:
> {code}
> () => new App { … }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22328) ClosureCleaner misses referenced superclass fields, gives them null values

2017-10-26 Thread Wenchen Fan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-22328:
---

Assignee: Liang-Chi Hsieh

> ClosureCleaner misses referenced superclass fields, gives them null values
> --
>
> Key: SPARK-22328
> URL: https://issues.apache.org/jira/browse/SPARK-22328
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Ryan Williams
>Assignee: Liang-Chi Hsieh
> Fix For: 2.2.1, 2.3.0
>
>
> [Runnable repro 
> here|https://github.com/ryan-williams/spark-bugs/tree/closure]:
> Superclass with some fields:
> {code}
> abstract class App extends Serializable {
>   // SparkContext stub
>   @transient lazy val sc = new SparkContext(new 
> SparkConf().setAppName("test").setMaster("local[4]").set("spark.ui.showConsoleProgress",
>  "false"))
>   // These fields get missed by the ClosureCleaner in some situations
>   val n1 = 111
>   val s1 = "aaa"
>   // Simple scaffolding to exercise passing a closure to RDD.foreach in 
> subclasses
>   def rdd = sc.parallelize(1 to 1)
>   def run(name: String): Unit = {
> print(s"$name:\t")
> body()
> sc.stop()
>   }
>   def body(): Unit
> }
> {code}
> Running a simple Spark job with various instantiations of this class:
> {code}
> object Main {
>   /** [[App]]s generated this way will not correctly detect references to 
> [[App.n1]] in Spark closures */
>   val fn = () ⇒ new App {
> val n2 = 222
> val s2 = "bbb"
> def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") }
>   }
>   /** Doesn't serialize closures correctly */
>   val app1 = fn()
>   /** Works fine */
>   val app2 =
> new App {
>   val n2 = 222
>   val s2 = "bbb"
>   def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") }
> }
>   /** [[App]]s created this way also work fine */
>   def makeApp(): App =
> new App {
>   val n2 = 222
>   val s2 = "bbb"
>   def body(): Unit = rdd.foreach { _ ⇒ println(s"$n1, $n2, $s1, $s2") }
> }
>   val app3 = makeApp()  // ok
>   val fn2 = () ⇒ makeApp()  // ok
>   def main(args: Array[String]): Unit = {
> fn().run("fn")// bad: n1 → 0, s1 → null
> app1.run("app1")  // bad: n1 → 0, s1 → null
> app2.run("app2")  // ok
> app3.run("app3")  // ok
> fn2().run("fn2")  // ok
>   }
> }
> {code}
> Build + Run:
> {code}
> $ sbt run
> …
> fn:   0, 222, null, bbb
> app1: 0, 222, null, bbb
> app2: 111, 222, aaa, bbb
> app3: 111, 222, aaa, bbb
> fn2:  111, 222, aaa, bbb
> {code}
> The first two versions have {{0}} and {{null}}, resp., for the {{A.n1}} and 
> {{A.s1}} fields.
> Something about this syntax causes the problem:
> {code}
> () => new App { … }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Bo Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221078#comment-16221078
 ] 

Bo Meng commented on SPARK-22357:
-

a quick fix could be as follows, correct me if i am wrong.
val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = 
> sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, 
> Math.max(openCostInBytes, bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
> 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-10-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221025#comment-16221025
 ] 

Cody Koeninger commented on SPARK-20928:


No, it doesn't exist yet as far as I know.

Reason I ask is that Michael had said on the dev list in September "I
think that we are going to have to change the Sink API as part of
SPARK-20928, which is why I linked these tickets together."

For aggregates, conceptually I think that the minimum and maximum per
partition kafka offset for any data involved in the aggregate is
sufficient to identify it.  But it seems like map-only is the bigger
focus here, which is probably fine.




> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2017-10-26 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221019#comment-16221019
 ] 

Sergey Zhemzhitsky commented on SPARK-22184:


Hello guys, is there a chance for this issue to be looked through as well as 
the corresponding PR? 
It would be really great for the fix to be included into spark 2.2.1/2.3.0.

> GraphX fails in case of insufficient memory and checkpoints enabled
> ---
>
> Key: SPARK-22184
> URL: https://issues.apache.org/jira/browse/SPARK-22184
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> GraphX fails with FileNotFoundException in case of insufficient memory when 
> checkpoints are enabled.
> Here is the stacktrace 
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> ...
> {code}
> As GraphX uses cached RDDs intensively, the issue is only reproducible when 
> previously cached and checkpointed Vertex and Edge RDDs are evicted from 
> memory and forced to be read from disk. 
> For testing purposes the following parameters may be set to emulate low 
> memory environment
> {code}
> val sparkConf = new SparkConf()
>   .set("spark.graphx.pregel.checkpointInterval", "2")
>   // set testing memory to evict cached RDDs from it and force
>   // reading checkpointed RDDs from disk
>   .set("spark.testing.reservedMemory", "128")
>   .set("spark.testing.memory", "256")
> {code}
> This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
> fixed too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs

2017-10-26 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221018#comment-16221018
 ] 

Sergey Zhemzhitsky commented on SPARK-22150:


Hello guys, is there a chance for this issue to be looked through as well as 
the corresponding PR? 
It would be really great for the fix to be included into spark 2.2.1/2.3.0.

> PeriodicCheckpointer fails with FileNotFoundException in case of dependant 
> RDDs
> ---
>
> Key: SPARK-22150
> URL: https://issues.apache.org/jira/browse/SPARK-22150
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 
> 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> PeriodicCheckpointer fails with FileNotFoundException in case of 
> checkpointing dependant RDDs (consider iterative algorithms), i.e. when the 
> RDD to checkpoint depends on already checkpointed RDD.
> Here is the exception
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705)
>   at 
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987)
> {code}
> The issue seems to be in this [piece of 
> code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94]
> {code:java}
> if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>   && sc.getCheckpointDir.nonEmpty) {
>   // Add new checkpoint before removing old checkpoints.
>   checkpoint(newData)
>   checkpointQueue.enqueue(newData)
>   // Remove checkpoints before the latest one.
>   var canDelete = true
>   while (checkpointQueue.size > 1 && canDelete) {
> // Delete the oldest checkpoint only if the next checkpoint exists.
> if (isCheckpointed(checkpointQueue.head)) {
>   removeCheckpointFile()
> } else {
>   canDelete = false
> }
>   }
> }
> {code}
> Given that _checkpointQueue.head_ is checkpointed and materialized and 
> _newData_ depends on _checkpointQueue.head_, then the exception happens on 
> action of 

[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-10-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220956#comment-16220956
 ] 

Reynold Xin commented on SPARK-20928:
-

That doesn't yet exist does it? How would that work for non-map jobs, e.g. an 
aggregate? That said, if it is for map-only, this can be tweaked to pass the 
offset ranges in addition to epoch id.

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp

2017-10-26 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220884#comment-16220884
 ] 

Shivaram Venkataraman commented on SPARK-22344:
---

Thanks for investigating. Is 
`/tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/` a symlink to 
`/tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/`  or are they just 
different directories ? 

We can disable the hsperfdata with the suggested flag and also change the 
java.io.tmpdir which should at least fix the blockmanager I think. I will open 
a PR for this.

Regarding Hive directories created even though its off, I have no idea why that 
is happening. [~falaki] [~hyukjin.kwon] do you have any idea on why this 
happens ?
 

> Prevent R CMD check from using /tmp
> ---
>
> Key: SPARK-22344
> URL: https://issues.apache.org/jira/browse/SPARK-22344
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0
>Reporter: Shivaram Venkataraman
>
> When R CMD check is run on the SparkR package it leaves behind files in /tmp 
> which is a violation of CRAN policy. We should instead write to Rtmpdir. 
> Notes from CRAN are below
> {code}
> Checking this leaves behind dirs
>hive/$USER
>$USER
> and files named like
>b4f6459b-0624-4100-8358-7aa7afbda757_resources
> in /tmp, in violation of the CRAN Policy.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11334) numRunningTasks can't be less than 0, or it will affect executor allocation

2017-10-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220881#comment-16220881
 ] 

Apache Spark commented on SPARK-11334:
--

User 'sitalkedia' has created a pull request for this issue:
https://github.com/apache/spark/pull/19580

> numRunningTasks can't be less than 0, or it will affect executor allocation
> ---
>
> Key: SPARK-11334
> URL: https://issues.apache.org/jira/browse/SPARK-11334
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.0
>Reporter: meiyoula
>Assignee: meiyoula
>
> With *Dynamic Allocation* function, a task failed over *maxFailure* time, all 
> the dependent jobs, stages, tasks will be killed or aborted. In this process, 
> *SparkListenerTaskEnd* event will be behind in *SparkListenerStageCompleted* 
> and *SparkListenerJobEnd*. Like the Event Log below:
> {code}
> {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":20,"Stage 
> Attempt ID":0,"Stage Name":"run at AccessController.java:-2","Number of 
> Tasks":200}
> {"Event":"SparkListenerJobEnd","Job ID":9,"Completion Time":1444914699829}
> {"Event":"SparkListenerTaskEnd","Stage ID":20,"Stage Attempt ID":0,"Task 
> Type":"ResultTask","Task End Reason":{"Reason":"TaskKilled"},"Task 
> Info":{"Task ID":1955,"Index":88,"Attempt":2,"Launch 
> Time":1444914699763,"Executor 
> ID":"5","Host":"linux-223","Locality":"PROCESS_LOCAL","Speculative":false,"Getting
>  Result Time":0,"Finish Time":1444914699864,"Failed":true,"Accumulables":[]}}
> {code}
> Because that, the *numRunningTasks* in *ExecutorAllocationManager* class will 
> be less than 0, and it will affect executor allocation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22312) Spark job stuck with no executor due to bug in Executor Allocation Manager

2017-10-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220857#comment-16220857
 ] 

Apache Spark commented on SPARK-22312:
--

User 'sitalkedia' has created a pull request for this issue:
https://github.com/apache/spark/pull/19580

> Spark job stuck with no executor due to bug in Executor Allocation Manager
> --
>
> Key: SPARK-22312
> URL: https://issues.apache.org/jira/browse/SPARK-22312
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>
> We often see the issue of Spark jobs stuck because the Executor Allocation 
> Manager does not ask for any executor even if there are pending tasks in case 
> dynamic allocation is turned on. Looking at the logic in EAM which calculates 
> the running tasks, it can happen that the calculation will be wrong and the 
> number of running tasks can become negative. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22364) unix_timestamp function sets valid dates to null

2017-10-26 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-22364.
---
Resolution: Not A Problem

These functions largely mimic Hive on purpose, and I think that's just how 
unix_timestamp("...") is defined to behave. I agree about timezones, and so 
would virtually always recommend you work in timestamps at the app level, not 
date strings without timezones. If you have a date string with timezone you 
should be able to parse it unambiguously with to_date or to_timestamp.

> unix_timestamp function sets valid dates to null
> 
>
> Key: SPARK-22364
> URL: https://issues.apache.org/jira/browse/SPARK-22364
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Windows 10, United Kingdom
>Reporter: Matthew Sinton-Hewitt
>
> org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.
> The dates happen to be the at the start of Daylight Savings time (UK and 
> possibly elsewhere).
> {code:java}
> val spark = SparkSession.builder.getOrCreate()
> import spark.implicits._
> spark.sparkContext.parallelize(
>Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", 
> "25/03/2012 02:01"))
>   .toDF("date")
>   .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
>   .show(false)
> // results:
> // 1332637140, null, null, 1332637260
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22042) ReorderJoinPredicates can break when child's partitioning is not decided

2017-10-26 Thread Andrew Ash (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220734#comment-16220734
 ] 

Andrew Ash commented on SPARK-22042:


Hi I'm seeing this problem as well, thanks for investigating and putting up a 
PR [~tejasp]!  Have you been running any of your clusters with a patched 
version of Spark including that change, and has it been behaving as expected?

The repro one of my users independently provided was this:
{noformat}
val rows = List(1, 2, 3, 4, 5, 6);
 
val df1 = sc.parallelize(rows).toDF("col").repartition(1);
val df2 = sc.parallelize(rows).toDF("col").repartition(2);
val df3 = sc.parallelize(rows).toDF("col").repartition(2);
 
val dd1 = df1.join(df2, df1.col("col").equalTo(df2.col("col"))).join(df3, 
df2.col("col").equalTo(df3.col("col")));
 
dd1.show;
{noformat}

> ReorderJoinPredicates can break when child's partitioning is not decided
> 
>
> Key: SPARK-22042
> URL: https://issues.apache.org/jira/browse/SPARK-22042
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Tejas Patil
>Priority: Minor
>
> When `ReorderJoinPredicates` tries to get the `outputPartitioning` of its 
> children, the children may not be properly constructed as the child-subtree 
> has to still go through other planner rules.
> In this particular case, the child is `SortMergeJoinExec`. Since the required 
> `Exchange` operators are not in place (because `EnsureRequirements` runs 
> _after_ `ReorderJoinPredicates`), the join's children would not have 
> partitioning defined. This breaks while creation the `PartitioningCollection` 
> here : 
> https://github.com/apache/spark/blob/94439997d57875838a8283c543f9b44705d3a503/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L69
> Small repro:
> {noformat}
> context.sql("SET spark.sql.autoBroadcastJoinThreshold=0")
> val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", 
> "k")
> df.write.format("parquet").saveAsTable("table1")
> df.write.format("parquet").saveAsTable("table2")
> df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table")
> sql("""
>   SELECT *
>   FROM (
> SELECT a.i, a.j, a.k
> FROM bucketed_table a
> JOIN table1 b
> ON a.i = b.i
>   ) c
>   JOIN table2
>   ON c.i = table2.i
> """).explain
> {noformat}
> This fails with :
> {noformat}
> java.lang.IllegalArgumentException: requirement failed: 
> PartitioningCollection requires all of its partitionings have the same 
> numPartitions.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.(partitioning.scala:324)
>   at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
>   at 
> org.apache.spark.sql.execution.ProjectExec.outputPartitioning(basicPhysicalOperators.scala:82)
>   at 
> org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:91)
>   at 
> org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:76)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>   at 
> org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:76)
>   at 
> org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:34)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100)
>   at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at 
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:100)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201)
>   at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201)
>   at 
> 

[jira] [Commented] (SPARK-22364) unix_timestamp function sets valid dates to null

2017-10-26 Thread Matthew Sinton-Hewitt (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220697#comment-16220697
 ] 

Matthew Sinton-Hewitt commented on SPARK-22364:
---

Yes I think you're right. 

However, it would be really great if there was a simple way to tell 
*unix_timestamp* to use a specific timezone. 
It should not be assumed that the JVM timezone will apply in all cases. You may 
be dealing with a variety of date sources.

> unix_timestamp function sets valid dates to null
> 
>
> Key: SPARK-22364
> URL: https://issues.apache.org/jira/browse/SPARK-22364
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Windows 10, United Kingdom
>Reporter: Matthew Sinton-Hewitt
>
> org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.
> The dates happen to be the at the start of Daylight Savings time (UK and 
> possibly elsewhere).
> {code:java}
> val spark = SparkSession.builder.getOrCreate()
> import spark.implicits._
> spark.sparkContext.parallelize(
>Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", 
> "25/03/2012 02:01"))
>   .toDF("date")
>   .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
>   .show(false)
> // results:
> // 1332637140, null, null, 1332637260
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-10-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220694#comment-16220694
 ] 

Cody Koeninger commented on SPARK-20928:


Can you clarify how this impacts sinks having access to the underlying kafka 
offsets, e.g. https://issues.apache.org/jira/browse/SPARK-18258

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20643) Implement listener for saving application status data in key-value store

2017-10-26 Thread Imran Rashid (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid resolved SPARK-20643.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 19383
[https://github.com/apache/spark/pull/19383]

> Implement listener for saving application status data in key-value store
> 
>
> Key: SPARK-20643
> URL: https://issues.apache.org/jira/browse/SPARK-20643
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
> Fix For: 2.3.0
>
>
> See spec in parent issue (SPARK-18085) for more details.
> This task tracks adding a new listener that will save application state to 
> the key-value store added in SPARK-20641; the listener will eventually 
> replace the existing listeners (such as JobProgressListener and 
> StatusListener), and the UI code will read data directly from the key-value 
> store instead of being coupled to the listener implementation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20643) Implement listener for saving application status data in key-value store

2017-10-26 Thread Imran Rashid (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid reassigned SPARK-20643:


Assignee: Marcelo Vanzin

> Implement listener for saving application status data in key-value store
> 
>
> Key: SPARK-20643
> URL: https://issues.apache.org/jira/browse/SPARK-20643
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
> Fix For: 2.3.0
>
>
> See spec in parent issue (SPARK-18085) for more details.
> This task tracks adding a new listener that will save application state to 
> the key-value store added in SPARK-20641; the listener will eventually 
> replace the existing listeners (such as JobProgressListener and 
> StatusListener), and the UI code will read data directly from the key-value 
> store instead of being coupled to the listener implementation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22365) Spark UI executors empty list with 500 error

2017-10-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220631#comment-16220631
 ] 

Sean Owen commented on SPARK-22365:
---

Is there no other error following this? a "Caused by" log? if it's really an 
error from inside jersey, I'm not sure if it's attributable to Spark, but still 
bears investigating. If it's actually an NPE from Spark code, then we need to 
see where. Unless anyone can reproduce it, it's hard to say what could be done.

> Spark UI executors empty list with 500 error
> 
>
> Key: SPARK-22365
> URL: https://issues.apache.org/jira/browse/SPARK-22365
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.2.0
>Reporter: Jakub Dubovsky
>
> No data loaded on "execturos" tab in sparkUI with stack trace below. Apart 
> from exception I have nothing more. But if I can test something to make this 
> easier to resolve I am happy to help.
> {{java.lang.NullPointerException
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
>   at 
> org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
>   at 
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
>   at 
> org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>   at 
> org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
>   at 
> org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
>   at 
> org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
>   at org.spark_project.jetty.server.Server.handle(Server.java:524)
>   at 
> org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
>   at 
> org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
>   at 
> org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
>   at 
> org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
>   at 
> org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
>   at 
> org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
>   at 
> org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
>   at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22365) Spark UI executors empty list with 500 error

2017-10-26 Thread Jakub Dubovsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220630#comment-16220630
 ] 

Jakub Dubovsky commented on SPARK-22365:


Is there anything I can do to investigate? I am a BE developer so maybe I am 
missing some trivial ways to check something...

> Spark UI executors empty list with 500 error
> 
>
> Key: SPARK-22365
> URL: https://issues.apache.org/jira/browse/SPARK-22365
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.2.0
>Reporter: Jakub Dubovsky
>
> No data loaded on "execturos" tab in sparkUI with stack trace below. Apart 
> from exception I have nothing more. But if I can test something to make this 
> easier to resolve I am happy to help.
> {{java.lang.NullPointerException
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
>   at 
> org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
>   at 
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
>   at 
> org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>   at 
> org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
>   at 
> org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
>   at 
> org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
>   at org.spark_project.jetty.server.Server.handle(Server.java:524)
>   at 
> org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
>   at 
> org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
>   at 
> org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
>   at 
> org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
>   at 
> org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
>   at 
> org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
>   at 
> org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
>   at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22365) Spark UI executors empty list with 500 error

2017-10-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220627#comment-16220627
 ] 

Sean Owen commented on SPARK-22365:
---

This doesn't actually show the underlying error, just reporting than an NPE 
happened elsewhere. Hard to do anything without more info, and I haven't seen 
this.

> Spark UI executors empty list with 500 error
> 
>
> Key: SPARK-22365
> URL: https://issues.apache.org/jira/browse/SPARK-22365
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.2.0
>Reporter: Jakub Dubovsky
>
> No data loaded on "execturos" tab in sparkUI with stack trace below. Apart 
> from exception I have nothing more. But if I can test something to make this 
> easier to resolve I am happy to help.
> {{java.lang.NullPointerException
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
>   at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
>   at 
> org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
>   at 
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
>   at 
> org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>   at 
> org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
>   at 
> org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
>   at 
> org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
>   at org.spark_project.jetty.server.Server.handle(Server.java:524)
>   at 
> org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
>   at 
> org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
>   at 
> org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
>   at 
> org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
>   at 
> org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
>   at 
> org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
>   at 
> org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
>   at 
> org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
>   at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22365) Spark UI executors empty list with 500 error

2017-10-26 Thread Jakub Dubovsky (JIRA)
Jakub Dubovsky created SPARK-22365:
--

 Summary: Spark UI executors empty list with 500 error
 Key: SPARK-22365
 URL: https://issues.apache.org/jira/browse/SPARK-22365
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.2.0
Reporter: Jakub Dubovsky


No data loaded on "execturos" tab in sparkUI with stack trace below. Apart from 
exception I have nothing more. But if I can test something to make this easier 
to resolve I am happy to help.

{{java.lang.NullPointerException
at 
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at 
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at 
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at 
org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
at 
org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at 
org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at 
org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at 
org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
at 
org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at 
org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:524)
at 
org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
at 
org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
at 
org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
at 
org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
at 
org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22364) unix_timestamp function sets valid dates to null

2017-10-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220483#comment-16220483
 ] 

Sean Owen commented on SPARK-22364:
---

Is the problem that this hour doesn't exist in the timezone because it's the 
hour that is skipped when the clocks move forwards?
See https://stackoverflow.com/questions/14201469/java-date-and-daylight-saving 
which actually talks about the exact same date.

> unix_timestamp function sets valid dates to null
> 
>
> Key: SPARK-22364
> URL: https://issues.apache.org/jira/browse/SPARK-22364
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Windows 10, United Kingdom
>Reporter: Matthew Sinton-Hewitt
>
> org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.
> The dates happen to be the at the start of Daylight Savings time (UK and 
> possibly elsewhere).
> {code:java}
> val spark = SparkSession.builder.getOrCreate()
> import spark.implicits._
> spark.sparkContext.parallelize(
>Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", 
> "25/03/2012 02:01"))
>   .toDF("date")
>   .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
>   .show(false)
> // results:
> // 1332637140, null, null, 1332637260
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22364) unix_timestamp function sets valid dates to null

2017-10-26 Thread Matthew Sinton-Hewitt (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sinton-Hewitt updated SPARK-22364:
--
Description: 
org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.

The dates happen to be the at the start of Daylight Savings time (UK and 
possibly elsewhere).

{code:java}

val spark = SparkSession.builder.getOrCreate()

import spark.implicits._

spark.sparkContext.parallelize(
   Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 
02:01"))
  .toDF("date")
  .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
  .show(false)

// results:
// 1332637140, null, null, 1332637260
{code}

  was:
org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.

The dates happen to be the at the start of Daylight Savings time (UK and 
possibly elsewhere).

{code:java}
import spark.implicits._

SparkSession.builder.getOrCreate()
.sparkContext.parallelize(
   Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 
02:01"))
  .toDF("date")
  .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
  .show(false)

// results:
// 1332637140, null, null, 1332637260
{code}


> unix_timestamp function sets valid dates to null
> 
>
> Key: SPARK-22364
> URL: https://issues.apache.org/jira/browse/SPARK-22364
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Windows 10, United Kingdom
>Reporter: Matthew Sinton-Hewitt
>
> org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.
> The dates happen to be the at the start of Daylight Savings time (UK and 
> possibly elsewhere).
> {code:java}
> val spark = SparkSession.builder.getOrCreate()
> import spark.implicits._
> spark.sparkContext.parallelize(
>Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", 
> "25/03/2012 02:01"))
>   .toDF("date")
>   .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
>   .show(false)
> // results:
> // 1332637140, null, null, 1332637260
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22364) unix_timestamp function sets valid dates to null

2017-10-26 Thread Matthew Sinton-Hewitt (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sinton-Hewitt updated SPARK-22364:
--
Description: 
org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.

The dates happen to be the at the start of Daylight Savings time (UK and 
possibly elsewhere).

{code:java}
import spark.implicits._

SparkSession.builder.getOrCreate()
.sparkContext.parallelize(
   Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 
02:01"))
  .toDF("date")
  .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
  .show(false)

// results:
// 1332637140, null, null, 1332637260
{code}

  was:
org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.

The dates happen to be the at the start of Daylight Savings time (UK and 
possibly elsewhere).

{code:scala}
import spark.implicits._

SparkSession.builder.getOrCreate()
.sparkContext.parallelize(
   Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 
02:01"))
  .toDF("date")
  .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
  .show(false)

// results:
// 1332637140, null, null, 1332637260
{code}


> unix_timestamp function sets valid dates to null
> 
>
> Key: SPARK-22364
> URL: https://issues.apache.org/jira/browse/SPARK-22364
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
> Environment: Windows 10, United Kingdom
>Reporter: Matthew Sinton-Hewitt
>
> org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.
> The dates happen to be the at the start of Daylight Savings time (UK and 
> possibly elsewhere).
> {code:java}
> import spark.implicits._
> SparkSession.builder.getOrCreate()
> .sparkContext.parallelize(
>Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", 
> "25/03/2012 02:01"))
>   .toDF("date")
>   .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
>   .show(false)
> // results:
> // 1332637140, null, null, 1332637260
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220469#comment-16220469
 ] 

Sean Owen commented on SPARK-21657:
---

I suspect that something somewhere is doing something that's linear-time that 
looks like it should be constant-time, like referencing a linked list by index. 
See https://issues.apache.org/jira/browse/SPARK-22330 for a similar type of 
thing (though don't think it's the same issue as this one)

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22364) unix_timestamp function sets valid dates to null

2017-10-26 Thread Matthew Sinton-Hewitt (JIRA)
Matthew Sinton-Hewitt created SPARK-22364:
-

 Summary: unix_timestamp function sets valid dates to null
 Key: SPARK-22364
 URL: https://issues.apache.org/jira/browse/SPARK-22364
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
 Environment: Windows 10, United Kingdom
Reporter: Matthew Sinton-Hewitt


org.apache.spark.sql.functions.unix_timestamp sets some valid dates to null.

The dates happen to be the at the start of Daylight Savings time (UK and 
possibly elsewhere).

{code:scala}
import spark.implicits._

SparkSession.builder.getOrCreate()
.sparkContext.parallelize(
   Seq("25/03/2012 00:59", "25/03/2012 01:00", "25/03/2012 01:59", "25/03/2012 
02:01"))
  .toDF("date")
  .select(unix_timestamp($"date", "dd/MM/ HH:mm"))
  .show(false)

// results:
// 1332637140, null, null, 1332637260
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21657) Spark has exponential time complexity to explode(array of structs)

2017-10-26 Thread Ohad Raviv (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220450#comment-16220450
 ] 

Ohad Raviv commented on SPARK-21657:


Hi,
Wanted to add that we're facing exactly the same issue. 6 hours work for one 
row that contains 250k array (of struct of 4 strings).
Just wanted to state that if we explode only the array, e.g, in your example:
cached_df = sqlc.sql('select explode(amft) from ' + table_name)

it finishes in about 3 mins. 
it happens in Spark 2.1 and also 2.2, eventhough SPARK-16998 was resolved.

> Spark has exponential time complexity to explode(array of structs)
> --
>
> Key: SPARK-21657
> URL: https://issues.apache.org/jira/browse/SPARK-21657
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.0.0, 2.1.0, 2.1.1, 2.2.0, 2.3.0
>Reporter: Ruslan Dautkhanov
>  Labels: cache, caching, collections, nested_types, performance, 
> pyspark, sparksql, sql
> Attachments: ExponentialTimeGrowth.PNG, 
> nested-data-generator-and-test.py
>
>
> It can take up to half a day to explode a modest-sized nested collection 
> (0.5m).
> On a recent Xeon processors.
> See attached pyspark script that reproduces this problem.
> {code}
> cached_df = sqlc.sql('select individ, hholdid, explode(amft) from ' + 
> table_name).cache()
> print sqlc.count()
> {code}
> This script generate a number of tables, with the same total number of 
> records across all nested collection (see `scaling` variable in loops). 
> `scaling` variable scales up how many nested elements in each record, but by 
> the same factor scales down number of records in the table. So total number 
> of records stays the same.
> Time grows exponentially (notice log-10 vertical axis scale):
> !ExponentialTimeGrowth.PNG!
> At scaling of 50,000 (see attached pyspark script), it took 7 hours to 
> explode the nested collections (\!) of 8k records.
> After 1000 elements in nested collection, time grows exponentially.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220431#comment-16220431
 ] 

Saisai Shao commented on SPARK-22357:
-

Yes, I know this parameter is ignored, but I'm not sure is it intended or not. 
If it breaks your case I think we should fix it anyway.

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = 
> sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, 
> Math.max(openCostInBytes, bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
> 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220417#comment-16220417
 ] 

Weichen Xu commented on SPARK-22357:


[~jerryshao] I checked the code, it ignore the `minPartitions` parameter 
indeed. About these text, [~imatiach] Can you help explain more ? 

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = 
> sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, 
> Math.max(openCostInBytes, bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
> 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220395#comment-16220395
 ] 

Saisai Shao commented on SPARK-22357:
-

bq. In our specific case this was a problem, because we initially read in just 
the files names and only after that the dataframe becomes very large, when 
reading in the images themselves – and in this case the new code does not 
handle the partitioning very well.

Would you please explain more about this?

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = 
> sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, 
> Math.max(openCostInBytes, bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
> 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22323) Design doc for different types of pandas_udf

2017-10-26 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220373#comment-16220373
 ] 

Hyukjin Kwon commented on SPARK-22323:
--

Let me leave a link to the discussion doc - 
https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc

> Design doc for different types of pandas_udf
> 
>
> Key: SPARK-22323
> URL: https://issues.apache.org/jira/browse/SPARK-22323
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Li Jin
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22363) Add unit test for Window spilling

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22363:


 Summary: Add unit test for Window spilling
 Key: SPARK-22363
 URL: https://issues.apache.org/jira/browse/SPARK-22363
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


Cover the senarios that WindowExec should spills for at least once.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22362) Add unit test for Window Aggregate Functions

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22362:


 Summary: Add unit test for Window Aggregate Functions
 Key: SPARK-22362
 URL: https://issues.apache.org/jira/browse/SPARK-22362
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


* Declarative
* Imperative
* UDAF



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22361) Add unit test for Window Frames

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22361:


 Summary: Add unit test for Window Frames
 Key: SPARK-22361
 URL: https://issues.apache.org/jira/browse/SPARK-22361
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


* OffsetWindowFunctionFrame
* UnboundedWindowFunctionFrame
* SlidingWindowFunctionFrame
* UnboundedPrecedingWindowFunctionFrame
* UnboundedFollowingWindowFunctionFrame



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22360) Add unit test for Window Specifications

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22360:


 Summary: Add unit test for Window Specifications
 Key: SPARK-22360
 URL: https://issues.apache.org/jira/browse/SPARK-22360
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


* different partition clauses (none, one, multiple)
* different order clauses (none, one, multiple, asc/desc, nulls first/last)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-17902) collect() ignores stringsAsFactors

2017-10-26 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-17902:


Assignee: Hyukjin Kwon

> collect() ignores stringsAsFactors
> --
>
> Key: SPARK-17902
> URL: https://issues.apache.org/jira/browse/SPARK-17902
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.1
>Reporter: Hossein Falaki
>Assignee: Hyukjin Kwon
> Fix For: 2.1.3, 2.2.1, 2.3.0
>
>
> `collect()` function signature includes an optional flag named 
> `stringsAsFactors`. It seems it is completely ignored.
> {code}
> str(collect(createDataFrame(iris), stringsAsFactors = TRUE)))
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17902) collect() ignores stringsAsFactors

2017-10-26 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-17902.
--
   Resolution: Fixed
Fix Version/s: 2.1.3
   2.3.0
   2.2.1

Issue resolved by pull request 19551
[https://github.com/apache/spark/pull/19551]

> collect() ignores stringsAsFactors
> --
>
> Key: SPARK-17902
> URL: https://issues.apache.org/jira/browse/SPARK-17902
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.0.1
>Reporter: Hossein Falaki
> Fix For: 2.2.1, 2.3.0, 2.1.3
>
>
> `collect()` function signature includes an optional flag named 
> `stringsAsFactors`. It seems it is completely ignored.
> {code}
> str(collect(createDataFrame(iris), stringsAsFactors = TRUE)))
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22359) Improve the test coverage of window functions

2017-10-26 Thread Jiang Xingbo (JIRA)
Jiang Xingbo created SPARK-22359:


 Summary: Improve the test coverage of window functions
 Key: SPARK-22359
 URL: https://issues.apache.org/jira/browse/SPARK-22359
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 2.3.0
Reporter: Jiang Xingbo


There are already quite a few integration tests using window functions, but the 
unit tests coverage for window funtions is not ideal.
We'd like to test the following aspects:
* Specifications
** different partition clauses (none, one, multiple)
** different order clauses (none, one, multiple, asc/desc, nulls first/last)
* Frames and their combinations
** OffsetWindowFunctionFrame
** UnboundedWindowFunctionFrame
** SlidingWindowFunctionFrame
** UnboundedPrecedingWindowFunctionFrame
** UnboundedFollowingWindowFunctionFrame
* Aggregate function types
** Declarative
** Imperative
** UDAF
* Spilling
** Cover the conditions that WindowExec should spill at least once 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster

2017-10-26 Thread Congxian Qiu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220280#comment-16220280
 ] 

Congxian Qiu commented on SPARK-22358:
--

I run the same program many times, got the above error some times, the other 
time the program run succeed. didn't know how to  
 reproduce the error 

> ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on 
> Yarn-Cluster
> 
>
> Key: SPARK-22358
> URL: https://issues.apache.org/jira/browse/SPARK-22358
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.2
>Reporter: Congxian Qiu
>
> When running a Spark Streaming program on Yarn-Cluster mode, received the 
> following ERROR message, and the program hanged. I found 
> [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar,
> I can't reproduce the error
> {noformat}
> [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for 
> one-way message. (org.apache.spark.network.server.TransportRequestHandler)
> java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:274)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265)
>   at 
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597)
>   at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>   at 
> 

[jira] [Comment Edited] (SPARK-9686) Spark Thrift server doesn't return correct JDBC metadata

2017-10-26 Thread Andriy Kushnir (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217201#comment-16217201
 ] 

Andriy Kushnir edited comment on SPARK-9686 at 10/26/17 10:18 AM:
--

[~rxin], I did a little research for this error.
To invoke {{run()}} → {{runInternal()}} on any 
{{org.apache.hive.service.cli.operation.Operation}} (for example, 
{{GetSchemasOperation}}) we need {{IMetaStoreClient}}. Currently it's taken 
from {{HiveSession}} instance:
{code:java}
public class GetSchemasOperation extends MetadataOperation {
@Override
public void runInternal() throws HiveSQLException {
IMetaStoreClient metastoreClient = 
getParentSession().getMetaStoreClient();
}
}
{code}

All opened {{HiveSession}} s are handled by 
{{org.apache.hive.service.cli.session.SessionManager}} instance.
{{SessionManager}}, among with others, implements 
{{org.apache.hive.service.Service}} interface, and all {{Service}} s 
initialized with same Hive configuration:
{code:java}
public interface Service { 
void init(HiveConf conf);
}
{code}
When {{org.apache.spark.sql.hive.thriftserver.HiveThriftServer2}} initializes, 
all {{org.apache.hive.service.CompositeService}} s receive same {{HiveConf}}:

{code:java}
private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends 
HiveServer2 with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
initCompositeService(hiveConf)
}
}

object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: SQLContext): Unit = {
val server = new HiveThriftServer2(sqlContext)

val executionHive = HiveUtils.newClientForExecution(
  sqlContext.sparkContext.conf,
  sqlContext.sessionState.newHadoopConf())

server.init(executionHive.conf)
}
}

{code}

So, {{HiveUtils#newClientForExecution()}} returns implementation of 
{{IMetaStoreClient}} which *ALWAYS* points to derby metastore (see dosctrings 
and comments in 
{{org.apache.spark.sql.hive.HiveUtils#newTemporaryConfiguration()}})

IMHO, to get correct metadata we need to additionally create another 
{{IMetaStoreClient}} with {{newClientForMetadata()}}, and pass it's 
{{HiveConf}} to underlying {{Service}} s.


was (Author: orhideous):
[~rxin], I did a little research for this error.
To invoke {{run()}} → {{runInternal()}} on any 
{{org.apache.hive.service.cli.operation.Operation}} (for example, 
{{GetSchemasOperation}}) we need {{IMetaStoreClient}}. Currently it's taken 
from {{HiveSession}} instance:
{code:java}
public class GetSchemasOperation extends MetadataOperation {
@Override
public void runInternal() throws HiveSQLException {
IMetaStoreClient metastoreClient = 
getParentSession().getMetaStoreClient();
}
}
{code}

All opened {{HiveSession}} s are handled by 
{{org.apache.hive.service.cli.session.SessionManager}} instance.
{{SessionManager}}, among with others, implements 
{{org.apache.hive.service.Service}} interface, and all {{Service}}s initialized 
with same Hive configuration:
{code:java}
public interface Service { 
void init(HiveConf conf);
}
{code}
When {{org.apache.spark.sql.hive.thriftserver.HiveThriftServer2}} initializes, 
all {{org.apache.hive.service.CompositeService}} s receive same {{HiveConf}}:

{code:java}
private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends 
HiveServer2 with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
initCompositeService(hiveConf)
}
}

object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: SQLContext): Unit = {
val server = new HiveThriftServer2(sqlContext)

val executionHive = HiveUtils.newClientForExecution(
  sqlContext.sparkContext.conf,
  sqlContext.sessionState.newHadoopConf())

server.init(executionHive.conf)
}
}

{code}

So, {{HiveUtils#newClientForExecution()}} returns implementation of 
{{IMetaStoreClient}} which *ALWAYS* points to derby metastore (see dosctrings 
and comments in 
{{org.apache.spark.sql.hive.HiveUtils#newTemporaryConfiguration()}})

IMHO, to get correct metadata we need to additionally create another 
{{IMetaStoreClient}} with {{newClientForMetadata()}}, and pass it's 
{{HiveConf}} to underlying {{Service}} s.

> Spark Thrift server doesn't return correct JDBC metadata 
> -
>
> Key: SPARK-9686
> URL: https://issues.apache.org/jira/browse/SPARK-9686
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2
>Reporter: pin_zhang
>Priority: Critical
> Attachments: SPARK-9686.1.patch.txt
>
>
> 1. Start  start-thriftserver.sh
> 2. connect with beeline
> 3. create table
> 4.show tables, the 

[jira] [Closed] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster

2017-10-26 Thread Congxian Qiu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu closed SPARK-22358.


> ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on 
> Yarn-Cluster
> 
>
> Key: SPARK-22358
> URL: https://issues.apache.org/jira/browse/SPARK-22358
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.2
>Reporter: Congxian Qiu
>
> When running a Spark Streaming program on Yarn-Cluster mode, received the 
> following ERROR message, and the program hanged. I found 
> [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar,
> I can't reproduce the error
> {noformat}
> [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for 
> one-way message. (org.apache.spark.network.server.TransportRequestHandler)
> java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:274)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265)
>   at 
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597)
>   at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 

[jira] [Resolved] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster

2017-10-26 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-22358.
---
Resolution: Cannot Reproduce

You have some strange corruption in your code or build: RpcAddvess
You would need to reproduce it on master or at least a supported branch.
You say you can't reproduce this, so, this should not be a JIRA.

> ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on 
> Yarn-Cluster
> 
>
> Key: SPARK-22358
> URL: https://issues.apache.org/jira/browse/SPARK-22358
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.2
>Reporter: Congxian Qiu
>
> When running a Spark Streaming program on Yarn-Cluster mode, received the 
> following ERROR message, and the program hanged. I found 
> [Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar,
> I can't reproduce the error
> {noformat}
> [2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for 
> one-way message. (org.apache.spark.network.server.TransportRequestHandler)
> java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:274)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265)
>   at 
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597)
>   at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>   at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>   

[jira] [Created] (SPARK-22358) ClassNotFoundException at NettyRpcEnv#deserialize when running Spark on Yarn-Cluster

2017-10-26 Thread Congxian Qiu (JIRA)
Congxian Qiu created SPARK-22358:


 Summary: ClassNotFoundException at NettyRpcEnv#deserialize when 
running Spark on Yarn-Cluster
 Key: SPARK-22358
 URL: https://issues.apache.org/jira/browse/SPARK-22358
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.2
Reporter: Congxian Qiu


When running a Spark Streaming program on Yarn-Cluster mode, received the 
following ERROR message, and the program hanged. I found 
[Spark-10986](https://issues.apache.org/jira/browse/SPARK-10986) is similiar,

I can't reproduce the error

{noformat}
[2017-10-26 16:53:18,274] ERROR Error while invoking RpcHandler#receive() for 
one-way message. (org.apache.spark.network.server.TransportRequestHandler)
java.lang.ClassNotFoundException: org.apache.spark.rpc.RpcAddvess
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:267)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:319)
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1.apply(NettyRpcEnv.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:265)
at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:597)
at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:586)
at 
org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:176)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:92)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at 

[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select

2017-10-26 Thread xinzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220239#comment-16220239
 ] 

xinzhang commented on SPARK-21725:
--

I tried the spark(version-master)  at 21/Aug2017, it still appear the problem . 
I will try it again now. I will replay u the result what I get . 
Thanks for your replay. [~mgaido]
[~srowen]

> spark thriftserver insert overwrite table partition select 
> ---
>
> Key: SPARK-21725
> URL: https://issues.apache.org/jira/browse/SPARK-21725
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: centos 6.7 spark 2.1  jdk8
>Reporter: xinzhang
>  Labels: spark-sql
>
> use thriftserver create table with partitions.
> session 1:
>  SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) 
> partitioned by (pt string) stored as parquet;
> --ok
>  !exit
> session 2:
>  SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) 
> partitioned by (pt string) stored as parquet; 
> --ok
>  !exit
> session 3:
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --ok
>  !exit
> session 4(do it again):
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --error
>  !exit
> -
> 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing 
> query, currentState RUNNING, 
> java.lang.reflect.InvocationTargetException
> ..
> ..
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move 
> source 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053
> 512282-2/-ext-1/part-0 to destination 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0
> at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644)
> at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324)
> ... 45 more
> Caused by: java.io.IOException: Filesystem closed
> 
> -
> the doc about the parquet table desc here 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
> Hive metastore Parquet table conversion
> When reading from and writing to Hive metastore Parquet tables, Spark SQL 
> will try to use its own Parquet support instead of Hive SerDe for better 
> performance. This behavior is controlled by the 
> spark.sql.hive.convertMetastoreParquet configuration, and is turned on by 
> default.
> I am confused the problem appear in the table(partitions)  but it is ok with 
> table(with out partitions) . It means spark do not use its own parquet ?
> Maybe someone give any suggest how could I avoid the issue?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select

2017-10-26 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220228#comment-16220228
 ] 

Marco Gaido commented on SPARK-21725:
-

please try with the master branch, not with Spark 2.1.2. I used that and I was 
unable to reproduce the issue. If you manage to reproduce the issue on the 
current master, then maybe I am doing something wrong trying to reproduce it, 
despite the steps you posted are pretty precise: thus in that case, I'd ask you 
to give more information about the configuration and to check the exact steps 
to reproduce it.
Otherwise, the only suggestion I can give is to upgrade to 2.3.0 as soon as it 
will be available.

> spark thriftserver insert overwrite table partition select 
> ---
>
> Key: SPARK-21725
> URL: https://issues.apache.org/jira/browse/SPARK-21725
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: centos 6.7 spark 2.1  jdk8
>Reporter: xinzhang
>  Labels: spark-sql
>
> use thriftserver create table with partitions.
> session 1:
>  SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) 
> partitioned by (pt string) stored as parquet;
> --ok
>  !exit
> session 2:
>  SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) 
> partitioned by (pt string) stored as parquet; 
> --ok
>  !exit
> session 3:
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --ok
>  !exit
> session 4(do it again):
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --error
>  !exit
> -
> 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing 
> query, currentState RUNNING, 
> java.lang.reflect.InvocationTargetException
> ..
> ..
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move 
> source 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053
> 512282-2/-ext-1/part-0 to destination 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0
> at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644)
> at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324)
> ... 45 more
> Caused by: java.io.IOException: Filesystem closed
> 
> -
> the doc about the parquet table desc here 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
> Hive metastore Parquet table conversion
> When reading from and writing to Hive metastore Parquet tables, Spark SQL 
> will try to use its own Parquet support instead of Hive SerDe for better 
> performance. This behavior is controlled by the 
> spark.sql.hive.convertMetastoreParquet configuration, and is turned on by 
> default.
> I am confused the problem appear in the table(partitions)  but it is ok with 
> table(with out partitions) . It means spark do not use its own parquet ?
> Maybe someone give any suggest how could I avoid the issue?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select

2017-10-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220225#comment-16220225
 ] 

Sean Owen commented on SPARK-21725:
---

[~zhangxin0112zx] there's no reason to expect 2.1.2 was different. He's asking 
you to try the current master branch.

> spark thriftserver insert overwrite table partition select 
> ---
>
> Key: SPARK-21725
> URL: https://issues.apache.org/jira/browse/SPARK-21725
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: centos 6.7 spark 2.1  jdk8
>Reporter: xinzhang
>  Labels: spark-sql
>
> use thriftserver create table with partitions.
> session 1:
>  SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) 
> partitioned by (pt string) stored as parquet;
> --ok
>  !exit
> session 2:
>  SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) 
> partitioned by (pt string) stored as parquet; 
> --ok
>  !exit
> session 3:
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --ok
>  !exit
> session 4(do it again):
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --error
>  !exit
> -
> 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing 
> query, currentState RUNNING, 
> java.lang.reflect.InvocationTargetException
> ..
> ..
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move 
> source 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053
> 512282-2/-ext-1/part-0 to destination 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0
> at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644)
> at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324)
> ... 45 more
> Caused by: java.io.IOException: Filesystem closed
> 
> -
> the doc about the parquet table desc here 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
> Hive metastore Parquet table conversion
> When reading from and writing to Hive metastore Parquet tables, Spark SQL 
> will try to use its own Parquet support instead of Hive SerDe for better 
> performance. This behavior is controlled by the 
> spark.sql.hive.convertMetastoreParquet configuration, and is turned on by 
> default.
> I am confused the problem appear in the table(partitions)  but it is ok with 
> table(with out partitions) . It means spark do not use its own parquet ?
> Maybe someone give any suggest how could I avoid the issue?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21725) spark thriftserver insert overwrite table partition select

2017-10-26 Thread xinzhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220216#comment-16220216
 ] 

xinzhang commented on SPARK-21725:
--

I download spark 2.1.2 .The problem still appear . Could u give me any suggests 
to avoid the problem . [~mgaido]


> spark thriftserver insert overwrite table partition select 
> ---
>
> Key: SPARK-21725
> URL: https://issues.apache.org/jira/browse/SPARK-21725
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
> Environment: centos 6.7 spark 2.1  jdk8
>Reporter: xinzhang
>  Labels: spark-sql
>
> use thriftserver create table with partitions.
> session 1:
>  SET hive.default.fileformat=Parquet;create table tmp_10(count bigint) 
> partitioned by (pt string) stored as parquet;
> --ok
>  !exit
> session 2:
>  SET hive.default.fileformat=Parquet;create table tmp_11(count bigint) 
> partitioned by (pt string) stored as parquet; 
> --ok
>  !exit
> session 3:
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --ok
>  !exit
> session 4(do it again):
> --connect the thriftserver
> SET hive.default.fileformat=Parquet;insert overwrite table tmp_10 
> partition(pt='1') select count(1) count from tmp_11;
> --error
>  !exit
> -
> 17/08/14 18:13:42 ERROR SparkExecuteStatementOperation: Error executing 
> query, currentState RUNNING, 
> java.lang.reflect.InvocationTargetException
> ..
> ..
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move 
> source 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/.hive-staging_hive_2017-08-14_18-13-39_035_6303339779053
> 512282-2/-ext-1/part-0 to destination 
> hdfs://dc-hadoop54:50001/group/user/user1/meta/hive-temp-table/user1.db/tmp_11/pt=1/part-0
> at org.apache.hadoop.hive.ql.metadata.Hive.moveFile(Hive.java:2644)
> at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2711)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1403)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1324)
> ... 45 more
> Caused by: java.io.IOException: Filesystem closed
> 
> -
> the doc about the parquet table desc here 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
> Hive metastore Parquet table conversion
> When reading from and writing to Hive metastore Parquet tables, Spark SQL 
> will try to use its own Parquet support instead of Hive SerDe for better 
> performance. This behavior is controlled by the 
> spark.sql.hive.convertMetastoreParquet configuration, and is turned on by 
> default.
> I am confused the problem appear in the table(partitions)  but it is ok with 
> table(with out partitions) . It means spark do not use its own parquet ?
> Maybe someone give any suggest how could I avoid the issue?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15905) Driver hung while writing to console progress bar

2017-10-26 Thread Denis Gabaydulin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220167#comment-16220167
 ] 

Denis Gabaydulin edited comment on SPARK-15905 at 10/26/17 8:48 AM:


SPARK 2.1.0
CentOS Linux release 7.3.1611 (Core)
jdk180_64_102

Not sure I've got the same issue. But I have at least two threads which are 
blocked on a logger. A first is main (where I called a unpresist() method).

{noformat}
Thread 30581: (state = BLOCKED)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) 
@bci=12, line=204 (Compiled frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, 
org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, 
line=391 (Compiled frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, 
java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame)
 - org.slf4j.impl.Log4jLoggerAdapter.warn(java.lang.String, 
java.lang.Throwable) @bci=12, line=479 (Interpreted frame)
 - 
org.apache.spark.internal.Logging$class.logWarning(org.apache.spark.internal.Logging,
 scala.Function0, java.lang.Throwable) @bci=30, line=87 (Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.logWarning(scala.Function0, 
java.lang.Throwable) @bci=3, line=30 (Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, 
org.apache.spark.rpc.RpcTimeout, scala.reflect.ClassTag) @bci=32, line=111 
(Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, 
scala.reflect.ClassTag) @bci=7, line=78 (Compiled frame)
 - org.apache.spark.storage.BlockManagerMaster.removeRdd(int, boolean) @bci=21, 
line=119 (Compiled frame)
 - org.apache.spark.SparkContext.unpersistRDD(int, boolean) @bci=12, line=1705 
(Compiled frame)
 - org.apache.spark.rdd.RDD.unpersist(boolean) @bci=21, line=216 (Interpreted 
frame)
 - 
org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply$mcZ$sp()
 @bci=70, line=116 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() 
@bci=1, line=111 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() 
@bci=1, line=111 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager.writeLock(scala.Function0) 
@bci=13, line=65 (Compiled frame)
 - 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(org.apache.spark.sql.Dataset,
 boolean) @bci=11, line=111 (Interpreted frame)
 - org.apache.spark.sql.Dataset.unpersist(boolean) @bci=12, line=2526 
(Interpreted frame)
 - org.apache.spark.sql.Dataset.unpersist() @bci=2, line=2536 (Interpreted 
frame)
 - 
ru.ok.dwh.analytics.user.kpi.service.KpiBaseMetricDailyAggregator.complete(boolean)
 @bci=4, line=68 (Interpreted frame)
 - ru.ok.dwh.analytics.service.v2.BaseSparkDatasetTransformation.complete() 
@bci=2, line=70 (Interpreted frame)
 - ru.ok.dwh.analytics.application.StandardApplication.run(java.lang.String[]) 
@bci=232, line=109 (Interpreted frame)
 - 
ru.ok.dwh.analytics.application.kpi.KpiVideoBaseMetricApp.main(java.lang.String[])
 @bci=51, line=53 (Interpreted frame)
 - sun.reflect.NativeMethodAccessorImpl.invoke0(java.lang.reflect.Method, 
java.lang.Object, java.lang.Object[]) @bci=0 (Interpreted frame)
 - sun.reflect.NativeMethodAccessorImpl.invoke(java.lang.Object, 
java.lang.Object[]) @bci=100, line=62 (Interpreted frame)
 - sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, 
java.lang.Object[]) @bci=6, line=43 (Interpreted frame)
 - java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) 
@bci=56, line=498 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(scala.collection.Seq,
 scala.collection.Seq, scala.collection.mutable.Map, java.lang.String, boolean) 
@bci=663, line=738 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(org.apache.spark.deploy.SparkSubmitArguments,
 scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, 
java.lang.String) @bci=18, line=187 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.submit(org.apache.spark.deploy.SparkSubmitArguments)
 @bci=245, line=212 (Interpreted frame)
 - org.apache.spark.deploy.SparkSubmit$.main(java.lang.String[]) @bci=76, 
line=126 (Interpreted frame)
 - org.apache.spark.deploy.SparkSubmit.main(java.lang.String[]) @bci=4 
(Interpreted frame)
{noformat}

And, a couple of spark internal methods

{noformat}
Thread 30910: (state = BLOCKED)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) 
@bci=12, line=204 (Compiled frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, 
org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, 
line=391 (Compiled frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, 

[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220169#comment-16220169
 ] 

Weichen Xu commented on SPARK-22357:


[~jerryshao] code formatted. This bug is reported from here 
https://github.com/apache/spark/pull/19439
You can ask [~imatiach] for more context.

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = 
> sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, 
> Math.max(openCostInBytes, bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The code previously, in version 2.0, was:
> {code}
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 
> 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> {code}
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15905) Driver hung while writing to console progress bar

2017-10-26 Thread Denis Gabaydulin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220167#comment-16220167
 ] 

Denis Gabaydulin edited comment on SPARK-15905 at 10/26/17 8:42 AM:


Not sure I've got the same issue. But I have at least two threads which are 
blocked on a logger. A first is main (where I called a unpresist() method).

{noformat}
Thread 30581: (state = BLOCKED)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) 
@bci=12, line=204 (Compiled frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, 
org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, 
line=391 (Compiled frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, 
java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame)
 - org.slf4j.impl.Log4jLoggerAdapter.warn(java.lang.String, 
java.lang.Throwable) @bci=12, line=479 (Interpreted frame)
 - 
org.apache.spark.internal.Logging$class.logWarning(org.apache.spark.internal.Logging,
 scala.Function0, java.lang.Throwable) @bci=30, line=87 (Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.logWarning(scala.Function0, 
java.lang.Throwable) @bci=3, line=30 (Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, 
org.apache.spark.rpc.RpcTimeout, scala.reflect.ClassTag) @bci=32, line=111 
(Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, 
scala.reflect.ClassTag) @bci=7, line=78 (Compiled frame)
 - org.apache.spark.storage.BlockManagerMaster.removeRdd(int, boolean) @bci=21, 
line=119 (Compiled frame)
 - org.apache.spark.SparkContext.unpersistRDD(int, boolean) @bci=12, line=1705 
(Compiled frame)
 - org.apache.spark.rdd.RDD.unpersist(boolean) @bci=21, line=216 (Interpreted 
frame)
 - 
org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply$mcZ$sp()
 @bci=70, line=116 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() 
@bci=1, line=111 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() 
@bci=1, line=111 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager.writeLock(scala.Function0) 
@bci=13, line=65 (Compiled frame)
 - 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(org.apache.spark.sql.Dataset,
 boolean) @bci=11, line=111 (Interpreted frame)
 - org.apache.spark.sql.Dataset.unpersist(boolean) @bci=12, line=2526 
(Interpreted frame)
 - org.apache.spark.sql.Dataset.unpersist() @bci=2, line=2536 (Interpreted 
frame)
 - 
ru.ok.dwh.analytics.user.kpi.service.KpiBaseMetricDailyAggregator.complete(boolean)
 @bci=4, line=68 (Interpreted frame)
 - ru.ok.dwh.analytics.service.v2.BaseSparkDatasetTransformation.complete() 
@bci=2, line=70 (Interpreted frame)
 - ru.ok.dwh.analytics.application.StandardApplication.run(java.lang.String[]) 
@bci=232, line=109 (Interpreted frame)
 - 
ru.ok.dwh.analytics.application.kpi.KpiVideoBaseMetricApp.main(java.lang.String[])
 @bci=51, line=53 (Interpreted frame)
 - sun.reflect.NativeMethodAccessorImpl.invoke0(java.lang.reflect.Method, 
java.lang.Object, java.lang.Object[]) @bci=0 (Interpreted frame)
 - sun.reflect.NativeMethodAccessorImpl.invoke(java.lang.Object, 
java.lang.Object[]) @bci=100, line=62 (Interpreted frame)
 - sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, 
java.lang.Object[]) @bci=6, line=43 (Interpreted frame)
 - java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) 
@bci=56, line=498 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(scala.collection.Seq,
 scala.collection.Seq, scala.collection.mutable.Map, java.lang.String, boolean) 
@bci=663, line=738 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(org.apache.spark.deploy.SparkSubmitArguments,
 scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, 
java.lang.String) @bci=18, line=187 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.submit(org.apache.spark.deploy.SparkSubmitArguments)
 @bci=245, line=212 (Interpreted frame)
 - org.apache.spark.deploy.SparkSubmit$.main(java.lang.String[]) @bci=76, 
line=126 (Interpreted frame)
 - org.apache.spark.deploy.SparkSubmit.main(java.lang.String[]) @bci=4 
(Interpreted frame)
{noformat}

And, a couple of spark internal methods

{noformat}
Thread 30910: (state = BLOCKED)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) 
@bci=12, line=204 (Compiled frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, 
org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, 
line=391 (Compiled frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, 
java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame)
 - 

[jira] [Updated] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Weichen Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu updated SPARK-22357:
---
Description: 
this is a bug in binaryFiles - even though we give it the partitions, 
binaryFiles ignores it.
This is a bug introduced in spark 2.1 from spark 2.0, in file 
PortableDataStream.scala the argument “minPartitions” is no longer used (with 
the push to master on 11/7/6):

{code}
/**
Allow minPartitions set by end-user in order to keep compatibility with old 
Hadoop API
which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) 
{
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
{code}

The code previously, in version 2.0, was:
{code}
def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = 
listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)
}
{code}

The new code is very smart, but it ignores what the user passes in and uses the 
data size, which is kind of a breaking change in some sense
In our specific case this was a problem, because we initially read in just the 
files names and only after that the dataframe becomes very large, when reading 
in the images themselves – and in this case the new code does not handle the 
partitioning very well.
I’m not sure if it can be easily fixed because I don’t understand the full 
context of the change in spark (but at the very least the unused parameter 
should be removed to avoid confusion).



  was:
this is a bug in binaryFiles - even though we give it the partitions, 
binaryFiles ignores it.
This is a bug introduced in spark 2.1 from spark 2.0, in file 
PortableDataStream.scala the argument “minPartitions” is no longer used (with 
the push to master on 11/7/6):

/**

Allow minPartitions set by end-user in order to keep compatibility with old 
Hadoop API
which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) 
{
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
The code previously, in version 2.0, was:

def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = 
listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)
}

The new code is very smart, but it ignores what the user passes in and uses the 
data size, which is kind of a breaking change in some sense
In our specific case this was a problem, because we initially read in just the 
files names and only after that the dataframe becomes very large, when reading 
in the images themselves – and in this case the new code does not handle the 
partitioning very well.
I’m not sure if it can be easily fixed because I don’t understand the full 
context of the change in spark (but at the very least the unused parameter 
should be removed to avoid confusion).




> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> {code}
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes 

[jira] [Commented] (SPARK-15905) Driver hung while writing to console progress bar

2017-10-26 Thread Denis Gabaydulin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220167#comment-16220167
 ] 

Denis Gabaydulin commented on SPARK-15905:
--

Not sure I've got the same issue. But I have at least two threads which are 
blocked on a logger. A first is main (where I called a unpresist() method).

{noformat}
Thread 30581: (state = BLOCKED)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) 
@bci=12, line=204 (Compiled frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, 
org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, 
line=391 (Compiled frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, 
java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame)
 - org.slf4j.impl.Log4jLoggerAdapter.warn(java.lang.String, 
java.lang.Throwable) @bci=12, line=479 (Interpreted frame)
 - 
org.apache.spark.internal.Logging$class.logWarning(org.apache.spark.internal.Logging,
 scala.Function0, java.lang.Throwable) @bci=30, line=87 (Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.logWarning(scala.Function0, 
java.lang.Throwable) @bci=3, line=30 (Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, 
org.apache.spark.rpc.RpcTimeout, scala.reflect.ClassTag) @bci=32, line=111 
(Interpreted frame)
 - org.apache.spark.rpc.RpcEndpointRef.askWithRetry(java.lang.Object, 
scala.reflect.ClassTag) @bci=7, line=78 (Compiled frame)
 - org.apache.spark.storage.BlockManagerMaster.removeRdd(int, boolean) @bci=21, 
line=119 (Compiled frame)
 - org.apache.spark.SparkContext.unpersistRDD(int, boolean) @bci=12, line=1705 
(Compiled frame)
 - org.apache.spark.rdd.RDD.unpersist(boolean) @bci=21, line=216 (Interpreted 
frame)
 - 
org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply$mcZ$sp()
 @bci=70, line=116 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() 
@bci=1, line=111 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager$$anonfun$uncacheQuery$1.apply() 
@bci=1, line=111 (Interpreted frame)
 - org.apache.spark.sql.execution.CacheManager.writeLock(scala.Function0) 
@bci=13, line=65 (Compiled frame)
 - 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(org.apache.spark.sql.Dataset,
 boolean) @bci=11, line=111 (Interpreted frame)
 - org.apache.spark.sql.Dataset.unpersist(boolean) @bci=12, line=2526 
(Interpreted frame)
 - org.apache.spark.sql.Dataset.unpersist() @bci=2, line=2536 (Interpreted 
frame)
 - 
ru.ok.dwh.analytics.user.kpi.service.KpiBaseMetricDailyAggregator.complete(boolean)
 @bci=4, line=68 (Interpreted frame)
 - ru.ok.dwh.analytics.service.v2.BaseSparkDatasetTransformation.complete() 
@bci=2, line=70 (Interpreted frame)
 - ru.ok.dwh.analytics.application.StandardApplication.run(java.lang.String[]) 
@bci=232, line=109 (Interpreted frame)
 - 
ru.ok.dwh.analytics.application.kpi.KpiVideoBaseMetricApp.main(java.lang.String[])
 @bci=51, line=53 (Interpreted frame)
 - sun.reflect.NativeMethodAccessorImpl.invoke0(java.lang.reflect.Method, 
java.lang.Object, java.lang.Object[]) @bci=0 (Interpreted frame)
 - sun.reflect.NativeMethodAccessorImpl.invoke(java.lang.Object, 
java.lang.Object[]) @bci=100, line=62 (Interpreted frame)
 - sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, 
java.lang.Object[]) @bci=6, line=43 (Interpreted frame)
 - java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) 
@bci=56, line=498 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(scala.collection.Seq,
 scala.collection.Seq, scala.collection.mutable.Map, java.lang.String, boolean) 
@bci=663, line=738 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(org.apache.spark.deploy.SparkSubmitArguments,
 scala.collection.Seq, scala.collection.Seq, scala.collection.mutable.Map, 
java.lang.String) @bci=18, line=187 (Interpreted frame)
 - 
org.apache.spark.deploy.SparkSubmit$.submit(org.apache.spark.deploy.SparkSubmitArguments)
 @bci=245, line=212 (Interpreted frame)
 - org.apache.spark.deploy.SparkSubmit$.main(java.lang.String[]) @bci=76, 
line=126 (Interpreted frame)
 - org.apache.spark.deploy.SparkSubmit.main(java.lang.String[]) @bci=4 
(Interpreted frame)
{noformat}

And, a couple of spark internal methods

{noforamt}
Thread 30910: (state = BLOCKED)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) 
@bci=12, line=204 (Compiled frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, 
org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, 
line=391 (Compiled frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, 
java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Compiled frame)
 - 

[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220160#comment-16220160
 ] 

Saisai Shao commented on SPARK-22357:
-

[~WeichenXu123] would you please format the code in JIRA description to make it 
easy to read :).

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
> bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> The code previously, in version 2.0, was:
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220152#comment-16220152
 ] 

Weichen Xu commented on SPARK-22357:


cc [~jerryshao]

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
> bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> The code previously, in version 2.0, was:
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Weichen Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220147#comment-16220147
 ] 

Weichen Xu commented on SPARK-22357:


cc [~imatiach] [~liancheng]

> SparkContext.binaryFiles ignore minPartitions parameter
> ---
>
> Key: SPARK-22357
> URL: https://issues.apache.org/jira/browse/SPARK-22357
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.2, 2.2.0
>Reporter: Weichen Xu
>
> this is a bug in binaryFiles - even though we give it the partitions, 
> binaryFiles ignores it.
> This is a bug introduced in spark 2.1 from spark 2.0, in file 
> PortableDataStream.scala the argument “minPartitions” is no longer used (with 
> the push to master on 11/7/6):
> /**
> Allow minPartitions set by end-user in order to keep compatibility with old 
> Hadoop API
> which is set through setMaxSplitSize
> */
> def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
> Int) {
> val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
> val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
> val defaultParallelism = sc.defaultParallelism
> val files = listStatus(context).asScala
> val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
> openCostInBytes).sum
> val bytesPerCore = totalBytes / defaultParallelism
> val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
> bytesPerCore))
> super.setMaxSplitSize(maxSplitSize)
> }
> The code previously, in version 2.0, was:
> def setMinPartitions(context: JobContext, minPartitions: Int) {
> val totalLen = 
> listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
> val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
> super.setMaxSplitSize(maxSplitSize)
> }
> The new code is very smart, but it ignores what the user passes in and uses 
> the data size, which is kind of a breaking change in some sense
> In our specific case this was a problem, because we initially read in just 
> the files names and only after that the dataframe becomes very large, when 
> reading in the images themselves – and in this case the new code does not 
> handle the partitioning very well.
> I’m not sure if it can be easily fixed because I don’t understand the full 
> context of the change in spark (but at the very least the unused parameter 
> should be removed to avoid confusion).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22357) SparkContext.binaryFiles ignore minPartitions parameter

2017-10-26 Thread Weichen Xu (JIRA)
Weichen Xu created SPARK-22357:
--

 Summary: SparkContext.binaryFiles ignore minPartitions parameter
 Key: SPARK-22357
 URL: https://issues.apache.org/jira/browse/SPARK-22357
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.2
Reporter: Weichen Xu


this is a bug in binaryFiles - even though we give it the partitions, 
binaryFiles ignores it.
This is a bug introduced in spark 2.1 from spark 2.0, in file 
PortableDataStream.scala the argument “minPartitions” is no longer used (with 
the push to master on 11/7/6):

/**

Allow minPartitions set by end-user in order to keep compatibility with old 
Hadoop API
which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) 
{
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val files = listStatus(context).asScala
val totalBytes = files.filterNot(.isDirectory).map(.getLen + 
openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, 
bytesPerCore))
super.setMaxSplitSize(maxSplitSize)
}
The code previously, in version 2.0, was:

def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = 
listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)
}

The new code is very smart, but it ignores what the user passes in and uses the 
data size, which is kind of a breaking change in some sense
In our specific case this was a problem, because we initially read in just the 
files names and only after that the dataframe becomes very large, when reading 
in the images themselves – and in this case the new code does not handle the 
partitioning very well.
I’m not sure if it can be easily fixed because I don’t understand the full 
context of the change in spark (but at the very least the unused parameter 
should be removed to avoid confusion).





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21840) Allow multiple SparkSubmit invocations in same JVM without polluting system properties

2017-10-26 Thread Saisai Shao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao reassigned SPARK-21840:
---

Assignee: Marcelo Vanzin

> Allow multiple SparkSubmit invocations in same JVM without polluting system 
> properties
> --
>
> Key: SPARK-21840
> URL: https://issues.apache.org/jira/browse/SPARK-21840
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>Assignee: Marcelo Vanzin
>Priority: Minor
> Fix For: 2.3.0
>
>
> Filing this as a sub-task of SPARK-11035; this feature was discussed as part 
> of the PR currently attached to that bug.
> Basically, to allow the launcher library to run applications in-process, the 
> easiest way is for it to run the {{SparkSubmit}} class. But that class 
> currently propagates configuration to applications by modifying system 
> properties.
> That means that when launching multiple applications in that manner in the 
> same JVM, the configuration of the first application may leak into the second 
> application (or to any other invocation of `new SparkConf()` for that matter).
> This feature is about breaking out the fix for this particular issue from the 
> PR linked to SPARK-11035. With the changes in SPARK-21728, the implementation 
> can even be further enhanced by providing an actual {{SparkConf}} instance to 
> the application, instead of opaque maps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21840) Allow multiple SparkSubmit invocations in same JVM without polluting system properties

2017-10-26 Thread Saisai Shao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao resolved SPARK-21840.
-
   Resolution: Fixed
Fix Version/s: 2.3.0

Issue resolved by pull request 19519
[https://github.com/apache/spark/pull/19519]

> Allow multiple SparkSubmit invocations in same JVM without polluting system 
> properties
> --
>
> Key: SPARK-21840
> URL: https://issues.apache.org/jira/browse/SPARK-21840
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Marcelo Vanzin
>Priority: Minor
> Fix For: 2.3.0
>
>
> Filing this as a sub-task of SPARK-11035; this feature was discussed as part 
> of the PR currently attached to that bug.
> Basically, to allow the launcher library to run applications in-process, the 
> easiest way is for it to run the {{SparkSubmit}} class. But that class 
> currently propagates configuration to applications by modifying system 
> properties.
> That means that when launching multiple applications in that manner in the 
> same JVM, the configuration of the first application may leak into the second 
> application (or to any other invocation of `new SparkConf()` for that matter).
> This feature is about breaking out the fix for this particular issue from the 
> PR linked to SPARK-11035. With the changes in SPARK-21728, the implementation 
> can even be further enhanced by providing an actual {{SparkConf}} instance to 
> the application, instead of opaque maps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22308) Support unit tests of spark code using ScalaTest using suites other than FunSuite

2017-10-26 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-22308.
-
   Resolution: Fixed
 Assignee: Nathan Kronenfeld
Fix Version/s: 2.3.0

> Support unit tests of spark code using ScalaTest using suites other than 
> FunSuite
> -
>
> Key: SPARK-22308
> URL: https://issues.apache.org/jira/browse/SPARK-22308
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Spark Core, SQL, Tests
>Affects Versions: 2.2.0
>Reporter: Nathan Kronenfeld
>Assignee: Nathan Kronenfeld
>Priority: Minor
>  Labels: scalatest, test-suite, test_issue
> Fix For: 2.3.0
>
>
> External codebases that have spark code can test it using SharedSparkContext, 
> no matter how they write their scalatests - basing on FunSuite, FunSpec, 
> FlatSpec, or WordSpec.
> SharedSQLContext only supports FunSuite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp

2017-10-26 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220083#comment-16220083
 ] 

Felix Cheung commented on SPARK-22344:
--

why is hive there when enableHiveSupport should be off?
what do we do with .cache/?

https://stackoverflow.com/questions/76327/how-can-i-prevent-java-from-creating-hsperfdata-files


> Prevent R CMD check from using /tmp
> ---
>
> Key: SPARK-22344
> URL: https://issues.apache.org/jira/browse/SPARK-22344
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0
>Reporter: Shivaram Venkataraman
>
> When R CMD check is run on the SparkR package it leaves behind files in /tmp 
> which is a violation of CRAN policy. We should instead write to Rtmpdir. 
> Notes from CRAN are below
> {code}
> Checking this leaves behind dirs
>hive/$USER
>$USER
> and files named like
>b4f6459b-0624-4100-8358-7aa7afbda757_resources
> in /tmp, in violation of the CRAN Policy.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22344) Prevent R CMD check from using /tmp

2017-10-26 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220080#comment-16220080
 ] 

Felix Cheung edited comment on SPARK-22344 at 10/26/17 7:25 AM:


this is what I see on a clean machine tracking access/create/modify to file 
system.

I run this command:
R CMD check --as-cran SparkR_2.1.2.tar.gz

And matching the report these are accessed:
/tmp/hive/
/tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/_tmp_space.db/
/tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/

But these are also there:
~/.cache
/tmp/hsperfdata_ubuntu/
/tmp/1993ae7a-f553-4de5-9f74-6c8393e3cd5a_resources/
/tmp/8201eb2c-8065-458c-b564-1e61b3dc5b7d_resources/

And this is created and deleted:
/tmp/blockmgr-b27976f3-b66a-44e1-94c2-7360525af321/



was (Author: felixcheung):
this is what I see on a clean machine tracking access/create/modify to file 
system.

I run this command:
R CMD check --as-cran SparkR_2.1.2.tar.gz

And matching the report these are accessed:
/tmp/hive/
/tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/_tmp_space.db/
/tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/

But these are also there:
~/.cache
/tmp/blockmgr-b27976f3-b66a-44e1-94c2-7360525af321/
/tmp/hsperfdata_ubuntu/
/tmp/1993ae7a-f553-4de5-9f74-6c8393e3cd5a_resources/
/tmp/8201eb2c-8065-458c-b564-1e61b3dc5b7d_resources/


> Prevent R CMD check from using /tmp
> ---
>
> Key: SPARK-22344
> URL: https://issues.apache.org/jira/browse/SPARK-22344
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0
>Reporter: Shivaram Venkataraman
>
> When R CMD check is run on the SparkR package it leaves behind files in /tmp 
> which is a violation of CRAN policy. We should instead write to Rtmpdir. 
> Notes from CRAN are below
> {code}
> Checking this leaves behind dirs
>hive/$USER
>$USER
> and files named like
>b4f6459b-0624-4100-8358-7aa7afbda757_resources
> in /tmp, in violation of the CRAN Policy.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22344) Prevent R CMD check from using /tmp

2017-10-26 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220080#comment-16220080
 ] 

Felix Cheung commented on SPARK-22344:
--

this is what I see on a clean machine tracking access/create/modify to file 
system.

I run this command:
R CMD check --as-cran SparkR_2.1.2.tar.gz

And matching the report these are accessed:
/tmp/hive/
/tmp/hive/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/_tmp_space.db/
/tmp/ubuntu/8201eb2c-8065-458c-b564-1e61b3dc5b7d/

But these are also there:
~/.cache
/tmp/blockmgr-b27976f3-b66a-44e1-94c2-7360525af321/
/tmp/hsperfdata_ubuntu/
/tmp/1993ae7a-f553-4de5-9f74-6c8393e3cd5a_resources/
/tmp/8201eb2c-8065-458c-b564-1e61b3dc5b7d_resources/


> Prevent R CMD check from using /tmp
> ---
>
> Key: SPARK-22344
> URL: https://issues.apache.org/jira/browse/SPARK-22344
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 1.6.3, 2.1.2, 2.2.0, 2.3.0
>Reporter: Shivaram Venkataraman
>
> When R CMD check is run on the SparkR package it leaves behind files in /tmp 
> which is a violation of CRAN policy. We should instead write to Rtmpdir. 
> Notes from CRAN are below
> {code}
> Checking this leaves behind dirs
>hive/$USER
>$USER
> and files named like
>b4f6459b-0624-4100-8358-7aa7afbda757_resources
> in /tmp, in violation of the CRAN Policy.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org