[jira] [Commented] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread tommy duan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839097#comment-16839097
 ] 

tommy duan commented on SPARK-27648:


Hi [~kabhwan] 

I'm so sorry.

The above is the main code. The broadcast updates mentioned above are all for 
business.

 
{code:java}
public class Main{
 public static void main(String[] args){
 SparkSession sparkSession =..; 
 
 SparkConf conf = _SparkHelper.getSparkConf();
 String resHDFSPath = SparkConfig.getInstance().getResPath(conf); 
 HashMap> resouseData =new 
HashMap<>(SparkConfig.getInstance().getRes(conf, sparkSession,resHDFSPath));
 JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
 Broadcast>> resBroadcast = 
jsc.broadcast(resouseData);
sparkSession.streams().addListener(new 
ResourceLoadListener(sparkSession,resBroadcast,resHDFSPath));
 
 // The business code
 start(sparkSession);
 
 sparkSession.streams().awaitAnyTermination();
 }
}

public class ResourceLoadListener extends StreamingQueryListener {
 private SparkSession sparkSession = null;
 private Broadcast>> resBroadcast = null;
 private String resHDFSPath = "";
 public ResourceLoadListener(SparkSession sparkSession, Broadcast>> resBroadcast,String resHDFSPath){
 this.sparkSession = sparkSession;
 this.resBroadcast = resBroadcast;
 this.resHDFSPath = resHDFSPath;
 }
 @Override
 public void onQueryStarted(QueryStartedEvent event) {
 }
 @Override
 public void onQueryProgress(QueryProgressEvent queryProgress){
 try{
 SparkConf conf = SparkHelper.getInstance().getSparkConf();
 HashMap> resouseData =new 
HashMap<>(SparkConfig.getInstance().getRes(conf, sparkSession,resHDFSPath);
 JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
 
 resBroadcast.unpersist(true);
 resBroadcast = jsc.broadcast(resouseData);
 
 System.out.println("reload resource size:"+resBroadcast.getValue().size());
 }catch (Exception e){
 System.out.println("reload resource error:"+this.resHDFSPath+e);
 } 
 }
 @Override
 public void onQueryTerminated(QueryTerminatedEvent event) {
 }
}
 
{code}
 

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the 

[jira] [Updated] (SPARK-27694) CTAS created data source table support collect statistics

2019-05-13 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-27694:

Summary: CTAS created data source table support collect statistics  (was: 
Create a data source table using the result of a query should update statistics 
if spark.sql.statistics.size.autoUpdate.enabled is enabled)

> CTAS created data source table support collect statistics
> -
>
> Key: SPARK-27694
> URL: https://issues.apache.org/jira/browse/SPARK-27694
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27695) SELECT * returns null column when reading from Hive / ORC and spark.sql.hive.convertMetastoreOrc=true

2019-05-13 Thread Oscar Cassetti (JIRA)
Oscar Cassetti created SPARK-27695:
--

 Summary: SELECT * returns null column when reading from Hive / ORC 
and spark.sql.hive.convertMetastoreOrc=true
 Key: SPARK-27695
 URL: https://issues.apache.org/jira/browse/SPARK-27695
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, Spark Core
Affects Versions: 2.3.3, 2.3.2, 2.3.1
Reporter: Oscar Cassetti


If you do 
{code:java}
select * from hive.some_table{code}
and the underlying data does not match exactly the schema the last column is 
returned as null 

Example 
{code:java}
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

conf = SparkConf().set('spark.sql.hive.convertMetastoreOrc', 'true')

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


data = [{'a':i, 'b':i+10, 'd':{'a':i, 'b':i+10}} for i in range(1, 100)]
data_schema = StructType([StructField('a', LongType(), True),
StructField('b', LongType(), True),
StructField('d', MapType(StringType(), LongType(), True), True)
])
rdd = spark.sparkContext.parallelize(data)

df = rdd.toDF(data_schema)

df.write.format("orc").save("./sample_data/")

spark.sql("""create external table tmp(
a bigint,
b bigint,
d map)
stored as orc
location 'sample_data/'
""")


spark.sql("select * from tmp").show()
{code}
This return correctl

{noformat}
+---+---+---+
|  a|  b|  d|
+---+---+---+
| 85| 95| [a -> 85, b -> 95]|
| 86| 96| [a -> 86, b -> 96]|
| 87| 97| [a -> 87, b -> 97]|
| 88| 98| [a -> 88, b -> 98]|
| 89| 99| [a -> 89, b -> 99]|
| 90|100|[a -> 90, b -> 100]|

{noformat}

However if add a new column in the underlying data without altering the hive 
schema 
the last column of the hive schema is set to null

{code}
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

conf = SparkConf().set('spark.sql.hive.convertMetastoreOrc', 'true')

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


data = [{'a':i, 'b':i+10, 'c':i+5, 'd':{'a':i, 'b':i+10, 'c':i+5}} for i in 
range(1, 100)]
data_schema = StructType([StructField('a', LongType(), True),
  StructField('b', LongType(), True),
  StructField('c', LongType(), True),
  StructField('d', MapType(StringType(), 
LongType(), True), True)
  ])
rdd = spark.sparkContext.parallelize(data)

df = rdd.toDF(data_schema)

df.write.format("orc").mode("overwrite").save("./sample_data/")


spark.sql("select * from tmp").show()

spark.read.orc("./sample_data/").show()
{code}

The first show() returns 
{noformat}
+---+---++
|  a|  b|   d|
+---+---++
| 85| 95|null|
| 86| 96|null|
| 87| 97|null|
| 88| 98|null|
| 89| 99|null|
| 90|100|null|
| 91|101|null|
| 92|102|null|
| 93|103|null|
| 94|104|null|
{noformat}

But the data on disk is correct
{noformat}
+---+---+---++
|  a|  b|  c|   d|
+---+---+---++
| 85| 95| 90|[a -> 85, b -> 95...|
| 86| 96| 91|[a -> 86, b -> 96...|
| 87| 97| 92|[a -> 87, b -> 97...|
| 88| 98| 93|[a -> 88, b -> 98...|
| 89| 99| 94|[a -> 89, b -> 99...|
| 90|100| 95|[a -> 90, b -> 10...|
| 91|101| 96|[a -> 91, b -> 10...|
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27671) Fix error when casting from a nested null in a struct

2019-05-13 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839070#comment-16839070
 ] 

Liang-Chi Hsieh commented on SPARK-27671:
-

[~dongjoon] Thanks for test and updating `Affects Version/s`. Do we need a 
patch for branch-2.3? If needed, please let me know.

> Fix error when casting from a nested null in a struct
> -
>
> Key: SPARK-27671
> URL: https://issues.apache.org/jira/browse/SPARK-27671
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 2.4.3
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.4, 3.0.0
>
>
> When a null in a nested field in struct, casting from the struct throws 
> error, currently.
> {code}
> scala> sql("select cast(struct(1, null) as struct)").show
> scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447) 
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)  
>  
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
>
> {code}
> {code}
> scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), 
> (('c', null)) AS tab(x, y)").show 
> org.apache.spark.sql.AnalysisException: failed to evaluate expression 
> named_struct('col1', 10, 'col2', NULL): NullType (of class 
> org.apache.spark.sql.t
> ypes.NullType$); line 1 pos 14
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27689) Error to execute hive views with spark

2019-05-13 Thread Yuming Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-27689:

Description: 
I have a python error when I execute the following code using hive views but it 
works correctly when I run it with hive tables.

*Hive databases:*

CREATE DATABASE schema_p LOCATION "hdfs:///tmp/schema_p";

*Hive tables:*

CREATE TABLE schema_p.person(
 id_person string, 
 identifier string, 
 gender string, 
 start_date string, 
 end_date string)
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
 STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
 OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
 LOCATION 'hdfs:///tmp/schema_p/person';

CREATE TABLE schema_p.product(
 id_product string,
 name string,
 country string,
 city string,
 start_date string,
 end_date string
 )
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
 STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
 OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
 LOCATION 'hdfs:///tmp/schema_p/product';

CREATE TABLE schema_p.person_product(
 id_person string,
 id_product string,
 country string,
 city string,
 price string,
 start_date string,
 end_date string
 )
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
 STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
 OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
 LOCATION 'hdfs:///tmp/schema_p/person_product';

*Hive views:*

CREATE VIEW schema_p.person_v AS SELECT CAST(id_person AS INT) AS id_person, 
CAST(identifier AS INT) AS identifier, gender AS gender, CAST(start_date AS 
DATE) AS start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.person;
 CREATE VIEW schema_p.product_v AS SELECT CAST(id_product AS INT) AS 
id_product, name AS name, country AS country, city AS city, CAST(start_date AS 
DATE) AS start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.product;
 CREATE VIEW schema_p.person_product_v AS SELECT CAST(id_person AS INT) AS 
id_person, CAST(id_product AS INT) AS id_product, country AS country, city AS 
city, CAST(price AS DECIMAL(38,8)) AS price, CAST(start_date AS DATE) AS 
start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.person_product;

*

*Code*:

 
{code:python}
def read_tables(sc):
  in_dict = { 'person': 'person_v', 'product': 'product_v', 'person_product': 
'person_product_v' }
  data_dict = {}
  for n, d in in_dict.iteritems():
data_dict[n] = sc.read.table(d)
  return data_dict

def get_population(tables, ref_date_str):
  person = tables['person']
  product = tables['product']
  person_product = tables['person_product']
  person_product_join = person_product.join(product,'id_product')
  count_prod = 
person_product.groupBy('id_product').agg(F.count('id_product').alias('count_prod'))
  person_count = person_product_join.join(count_prod,'id_product')
  final1 = person_product_join.join(person_count, 'id_person', 'left')
  final = final1.withColumn('reference_date', F.lit(ref_date_str))
  return final

import pyspark.sql.functions as F
import functools
from pyspark.sql.functions import col
from pyspark.sql.functions import add_months, lit, count, coalesce

spark.sql('use schema_p')
data_dict = read_tables(spark)
data_dict

population = get_population(data_dict, '2019-04-30')
population.show(){code}
*

*Error:*

File "", line 1, in 
 File "", line 10, in get_population
 File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 
931, in join
 jdf = self._jdf.join(other._jdf, on, how)
 File 
"/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
 line 1160, in __call__
 File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in 
deco
 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
 pyspark.sql.utils.AnalysisException: u'Resolved attribute(s) 
id_product#124,end_date#129,city#126,price#127,start_date#128,id_person#123,country#125
 missing from 
city#47,price#48,start_date#49,id_product#45,end_date#50,id_person#44,country#46
 in operator !Project [cast(id_person#123 as int) AS id_person#96, 
cast(id_product#124 as int) AS id_product#97, cast(country#125 as string) AS 
country#98, cast(city#126 as string) AS city#99, cast(price#127 as 
decimal(38,8)) AS price#100, cast(start_date#128 as date) AS start_date#101, 
cast(end_date#129 as date) AS end_date#102|#123 as int) AS id_person#96, 
cast(id_product#124 as int) AS id_product#97, cast(country#125 as string) AS 
country#98, cast(city#126 as string) AS city#99, cast(price#127 as 
decimal(38,8)) AS price#100, cast(start_date#128 as date) AS start_date#101, 
cast(end_date#129 as date) AS end_date#102]. Attribute(s) with the same name 
appear in the 

[jira] [Assigned] (SPARK-27694) Create a data source table using the result of a query should update statistics if spark.sql.statistics.size.autoUpdate.enabled is enabled

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27694:


Assignee: Apache Spark

> Create a data source table using the result of a query should update 
> statistics if spark.sql.statistics.size.autoUpdate.enabled is enabled
> --
>
> Key: SPARK-27694
> URL: https://issues.apache.org/jira/browse/SPARK-27694
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27694) Create a data source table using the result of a query should update statistics if spark.sql.statistics.size.autoUpdate.enabled is enabled

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27694:


Assignee: (was: Apache Spark)

> Create a data source table using the result of a query should update 
> statistics if spark.sql.statistics.size.autoUpdate.enabled is enabled
> --
>
> Key: SPARK-27694
> URL: https://issues.apache.org/jira/browse/SPARK-27694
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27694) Create a data source table using the result of a query should update statistics if spark.sql.statistics.size.autoUpdate.enabled is enabled

2019-05-13 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-27694:
---

 Summary: Create a data source table using the result of a query 
should update statistics if spark.sql.statistics.size.autoUpdate.enabled is 
enabled
 Key: SPARK-27694
 URL: https://issues.apache.org/jira/browse/SPARK-27694
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27690) Remove materialized view first in `HiveClientImpl.reset`

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27690:
--
Summary: Remove materialized view first in `HiveClientImpl.reset`  (was: 
Refactor HiveClientImpl#reset() to remove materialized view first )

> Remove materialized view first in `HiveClientImpl.reset`
> 
>
> Key: SPARK-27690
> URL: https://issues.apache.org/jira/browse/SPARK-27690
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> {noformat}
> Cause: 
> org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException:
>  DELETE on table 'TBLS' caused a violation of foreign key constraint 
> 'MV_TABLES_USED_FK2' for key (4).  The statement has been rolled back.
> at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeLargeBatch(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeBatch(Unknown 
> Source){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27690) Remove materialized views first in `HiveClientImpl.reset`

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27690:
--
Summary: Remove materialized views first in `HiveClientImpl.reset`  (was: 
Remove materialized view first in `HiveClientImpl.reset`)

> Remove materialized views first in `HiveClientImpl.reset`
> -
>
> Key: SPARK-27690
> URL: https://issues.apache.org/jira/browse/SPARK-27690
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> {noformat}
> Cause: 
> org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException:
>  DELETE on table 'TBLS' caused a violation of foreign key constraint 
> 'MV_TABLES_USED_FK2' for key (4).  The statement has been rolled back.
> at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeLargeBatch(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeBatch(Unknown 
> Source){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-27682) Avoid use of Scala collection classes that are removed in 2.13

2019-05-13 Thread Stavros Kontopoulos (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stavros Kontopoulos updated SPARK-27682:

Comment: was deleted

(was: Removing .to[...] means creating explicitly the type you needs a bit 
verbose I guess but its the less intrusive option. )

> Avoid use of Scala collection classes that are removed in 2.13
> --
>
> Key: SPARK-27682
> URL: https://issues.apache.org/jira/browse/SPARK-27682
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Minor
>
> Scala 2.13 will remove several collection classes like {{MutableList}}. We 
> should avoid using them and replace with similar classes proactively.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27682) Avoid use of Scala collection classes that are removed in 2.13

2019-05-13 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838885#comment-16838885
 ] 

Stavros Kontopoulos commented on SPARK-27682:
-

Removing .to[...] means creating explicitly the type you needs a bit verbose I 
guess but its the less intrusive option. 

> Avoid use of Scala collection classes that are removed in 2.13
> --
>
> Key: SPARK-27682
> URL: https://issues.apache.org/jira/browse/SPARK-27682
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Minor
>
> Scala 2.13 will remove several collection classes like {{MutableList}}. We 
> should avoid using them and replace with similar classes proactively.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27693) DataSourceV2: Add default catalog property

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27693:


Assignee: Apache Spark

> DataSourceV2: Add default catalog property
> --
>
> Key: SPARK-27693
> URL: https://issues.apache.org/jira/browse/SPARK-27693
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Ryan Blue
>Assignee: Apache Spark
>Priority: Major
>
> Add a default catalog property for DataSourceV2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27693) DataSourceV2: Add default catalog property

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27693:


Assignee: (was: Apache Spark)

> DataSourceV2: Add default catalog property
> --
>
> Key: SPARK-27693
> URL: https://issues.apache.org/jira/browse/SPARK-27693
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Ryan Blue
>Priority: Major
>
> Add a default catalog property for DataSourceV2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27693) DataSourceV2: Add default catalog property

2019-05-13 Thread Ryan Blue (JIRA)
Ryan Blue created SPARK-27693:
-

 Summary: DataSourceV2: Add default catalog property
 Key: SPARK-27693
 URL: https://issues.apache.org/jira/browse/SPARK-27693
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.3
Reporter: Ryan Blue


Add a default catalog property for DataSourceV2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27671) Fix error when casting from a nested null in a struct

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-27671.
---
   Resolution: Fixed
 Assignee: Liang-Chi Hsieh
Fix Version/s: 3.0.0
   2.4.4

This is resolved via https://github.com/apache/spark/pull/24576

> Fix error when casting from a nested null in a struct
> -
>
> Key: SPARK-27671
> URL: https://issues.apache.org/jira/browse/SPARK-27671
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 2.4.3
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.4, 3.0.0
>
>
> When a null in a nested field in struct, casting from the struct throws 
> error, currently.
> {code}
> scala> sql("select cast(struct(1, null) as struct)").show
> scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447) 
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)  
>  
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
>
> {code}
> {code}
> scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), 
> (('c', null)) AS tab(x, y)").show 
> org.apache.spark.sql.AnalysisException: failed to evaluate expression 
> named_struct('col1', 10, 'col2', NULL): NullType (of class 
> org.apache.spark.sql.t
> ypes.NullType$); line 1 pos 14
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27692) Optimize evaluation of udf that is deterministic and has literal inputs

2019-05-13 Thread Sunitha Kambhampati (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838812#comment-16838812
 ] 

Sunitha Kambhampati commented on SPARK-27692:
-

Here is the PR: https://github.com/apache/spark/pull/24593

> Optimize evaluation of udf that is deterministic and has literal inputs
> ---
>
> Key: SPARK-27692
> URL: https://issues.apache.org/jira/browse/SPARK-27692
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Sunitha Kambhampati
>Priority: Major
>
> Deterministic UDF is a udf for which the following is true:  Given a specific 
> input, the output of the udf will be the same no matter how many times you 
> execute the udf.
> When your inputs to the UDF are all literal and UDF is deterministic, we can 
> optimize this to evaluate the udf once and use the output instead of 
> evaluating the UDF each time for every row in the query. 
> This is valid only if the UDF is deterministic and inputs are literal.  
> Otherwise we should not and cannot apply this optimization. 
> *Testing:* 
> We have used this internally and have seen significant performance 
> improvements for some very expensive UDFs ( as expected).
> In the PR, I have added unit tests. 
> *Credits:* 
> Thanks to Guy Khazma([https://github.com/guykhazma]) from the IBM Haifa 
> Research Team for the idea and the original implementation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27692) Optimize evaluation of udf that is deterministic and has literal inputs

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27692:


Assignee: (was: Apache Spark)

> Optimize evaluation of udf that is deterministic and has literal inputs
> ---
>
> Key: SPARK-27692
> URL: https://issues.apache.org/jira/browse/SPARK-27692
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Sunitha Kambhampati
>Priority: Major
>
> Deterministic UDF is a udf for which the following is true:  Given a specific 
> input, the output of the udf will be the same no matter how many times you 
> execute the udf.
> When your inputs to the UDF are all literal and UDF is deterministic, we can 
> optimize this to evaluate the udf once and use the output instead of 
> evaluating the UDF each time for every row in the query. 
> This is valid only if the UDF is deterministic and inputs are literal.  
> Otherwise we should not and cannot apply this optimization. 
> *Testing:* 
> We have used this internally and have seen significant performance 
> improvements for some very expensive UDFs ( as expected).
> In the PR, I have added unit tests. 
> *Credits:* 
> Thanks to Guy Khazma([https://github.com/guykhazma]) from the IBM Haifa 
> Research Team for the idea and the original implementation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27692) Optimize evaluation of udf that is deterministic and has literal inputs

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27692:


Assignee: Apache Spark

> Optimize evaluation of udf that is deterministic and has literal inputs
> ---
>
> Key: SPARK-27692
> URL: https://issues.apache.org/jira/browse/SPARK-27692
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Sunitha Kambhampati
>Assignee: Apache Spark
>Priority: Major
>
> Deterministic UDF is a udf for which the following is true:  Given a specific 
> input, the output of the udf will be the same no matter how many times you 
> execute the udf.
> When your inputs to the UDF are all literal and UDF is deterministic, we can 
> optimize this to evaluate the udf once and use the output instead of 
> evaluating the UDF each time for every row in the query. 
> This is valid only if the UDF is deterministic and inputs are literal.  
> Otherwise we should not and cannot apply this optimization. 
> *Testing:* 
> We have used this internally and have seen significant performance 
> improvements for some very expensive UDFs ( as expected).
> In the PR, I have added unit tests. 
> *Credits:* 
> Thanks to Guy Khazma([https://github.com/guykhazma]) from the IBM Haifa 
> Research Team for the idea and the original implementation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27692) Optimize evaluation of udf that is deterministic and has literal inputs

2019-05-13 Thread Sunitha Kambhampati (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838803#comment-16838803
 ] 

Sunitha Kambhampati commented on SPARK-27692:
-

I'll open a PR soon. 

> Optimize evaluation of udf that is deterministic and has literal inputs
> ---
>
> Key: SPARK-27692
> URL: https://issues.apache.org/jira/browse/SPARK-27692
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Sunitha Kambhampati
>Priority: Major
>
> Deterministic UDF is a udf for which the following is true:  Given a specific 
> input, the output of the udf will be the same no matter how many times you 
> execute the udf.
> When your inputs to the UDF are all literal and UDF is deterministic, we can 
> optimize this to evaluate the udf once and use the output instead of 
> evaluating the UDF each time for every row in the query. 
> This is valid only if the UDF is deterministic and inputs are literal.  
> Otherwise we should not and cannot apply this optimization. 
> *Testing:* 
> We have used this internally and have seen significant performance 
> improvements for some very expensive UDFs ( as expected).
> In the PR, I have added unit tests. 
> *Credits:* 
> Thanks to Guy Khazma([https://github.com/guykhazma]) from the IBM Haifa 
> Research Team for the idea and the original implementation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27692) Optimize evaluation of udf that is deterministic and has literal inputs

2019-05-13 Thread Sunitha Kambhampati (JIRA)
Sunitha Kambhampati created SPARK-27692:
---

 Summary: Optimize evaluation of udf that is deterministic and has 
literal inputs
 Key: SPARK-27692
 URL: https://issues.apache.org/jira/browse/SPARK-27692
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Sunitha Kambhampati


Deterministic UDF is a udf for which the following is true:  Given a specific 
input, the output of the udf will be the same no matter how many times you 
execute the udf.

When your inputs to the UDF are all literal and UDF is deterministic, we can 
optimize this to evaluate the udf once and use the output instead of evaluating 
the UDF each time for every row in the query. 

This is valid only if the UDF is deterministic and inputs are literal.  
Otherwise we should not and cannot apply this optimization. 

*Testing:* 

We have used this internally and have seen significant performance improvements 
for some very expensive UDFs ( as expected).

In the PR, I have added unit tests. 

*Credits:* 

Thanks to Guy Khazma([https://github.com/guykhazma]) from the IBM Haifa 
Research Team for the idea and the original implementation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838799#comment-16838799
 ] 

Marcelo Vanzin commented on SPARK-27681:


bq. The case to consider is, roughly, where Seq is an argument not a return 
type.

If this change is restricted to that it's probably fine. But it's still a 
tricky change. You'll probably have to explicitly use that class in the problem 
spots (i.e. no importing at the top level or it may change things you don't 
want to change). Then people will have to remember to do that (or reviewers 
remember to catch it during review). It just feels a little too brittle.

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. In many cases, this will be 
> fine, as Spark users using Scala 2.13 will also have this changed alias. In 
> some cases it may be undesirable, as it will cause some code to compile in 
> 2.12 but not in 2.13. In some cases, making the type {{scala.collection.Seq}} 
> explicit so that it doesn't vary can help avoid this, so that Spark apps 
> might cross-compile for 2.12 and 2.13 with the same source.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27671) Fix error when casting from a nested null in a struct

2019-05-13 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838795#comment-16838795
 ] 

Dongjoon Hyun commented on SPARK-27671:
---

The patch can land on `master` and `branch-2.4`. From branch-2.3, we need more 
patches. So, it's not feasible to backport.

> Fix error when casting from a nested null in a struct
> -
>
> Key: SPARK-27671
> URL: https://issues.apache.org/jira/browse/SPARK-27671
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 2.4.3
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> When a null in a nested field in struct, casting from the struct throws 
> error, currently.
> {code}
> scala> sql("select cast(struct(1, null) as struct)").show
> scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447) 
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)  
>  
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
>
> {code}
> {code}
> scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), 
> (('c', null)) AS tab(x, y)").show 
> org.apache.spark.sql.AnalysisException: failed to evaluate expression 
> named_struct('col1', 10, 'col2', NULL): NullType (of class 
> org.apache.spark.sql.t
> ypes.NullType$); line 1 pos 14
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27671) Fix error when casting from a nested null in a struct

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27671:
--
Affects Version/s: 2.4.3

> Fix error when casting from a nested null in a struct
> -
>
> Key: SPARK-27671
> URL: https://issues.apache.org/jira/browse/SPARK-27671
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 2.4.3
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> When a null in a nested field in struct, casting from the struct throws 
> error, currently.
> {code}
> scala> sql("select cast(struct(1, null) as struct)").show
> scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447) 
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)  
>  
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
>
> {code}
> {code}
> scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), 
> (('c', null)) AS tab(x, y)").show 
> org.apache.spark.sql.AnalysisException: failed to evaluate expression 
> named_struct('col1', 10, 'col2', NULL): NullType (of class 
> org.apache.spark.sql.t
> ypes.NullType$); line 1 pos 14
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27671) Fix error when casting from a nested null in a struct

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27671:
--
Summary: Fix error when casting from a nested null in a struct  (was: 
Analysis exception thrown when casting from a nested null in a struct)

> Fix error when casting from a nested null in a struct
> -
>
> Key: SPARK-27671
> URL: https://issues.apache.org/jira/browse/SPARK-27671
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> When a null in a nested field in struct, casting from the struct throws 
> error, currently.
> {code}
> scala> sql("select cast(struct(1, null) as struct)").show
> scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447) 
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)  
>  
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
>
> {code}
> {code}
> scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), 
> (('c', null)) AS tab(x, y)").show 
> org.apache.spark.sql.AnalysisException: failed to evaluate expression 
> named_struct('col1', 10, 'col2', NULL): NullType (of class 
> org.apache.spark.sql.t
> ypes.NullType$); line 1 pos 14
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838781#comment-16838781
 ] 

Sean Owen commented on SPARK-27681:
---

You're right, the change is kinda breaking! we can just accept that, or 
explicitly patch over the breaking change where we think it's important. I 
agree, using the convenient Seq alias is just going to keep happening, but 
doesn't mean we may not be explicit in cases where it matters enough, in a 
public API here or there.

(We won't be able to avoid fixing Spark methods that need to return a 
non-immutable Seq and return scala.Seq now, so there's at least that.)

Yep your example is just what we don't want to do. It just breaks things. The 
case to consider is, roughly, where Seq is an argument not a return type.

Yes, mutable and immutable Seqs have always been separate subtypes of 
scala.collection.Seq, and neither is or will be assignable to the other. Your 
example works in 2.12 but not in 2.13. But scala.collection.Seq is the parent 
of both mutable and immutable Seqs.

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. In many cases, this will be 
> fine, as Spark users using Scala 2.13 will also have this changed alias. In 
> some cases it may be undesirable, as it will cause some code to compile in 
> 2.12 but not in 2.13. In some cases, making the type {{scala.collection.Seq}} 
> explicit so that it doesn't vary can help avoid this, so that Spark apps 
> might cross-compile for 2.12 and 2.13 with the same source.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-21367) R older version of Roxygen2 on Jenkins

2019-05-13 Thread shane knapp (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-21367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shane knapp closed SPARK-21367.
---

> R older version of Roxygen2 on Jenkins
> --
>
> Key: SPARK-21367
> URL: https://issues.apache.org/jira/browse/SPARK-21367
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.3.0
>Reporter: Felix Cheung
>Assignee: shane knapp
>Priority: Major
> Attachments: R.paks
>
>
> Getting this message from a recent build.
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79461/console
> Warning messages:
> 1: In check_dep_version(pkg, version, compare) :
>   Need roxygen2 >= 5.0.0 but loaded version is 4.1.1
> 2: In check_dep_version(pkg, version, compare) :
>   Need roxygen2 >= 5.0.0 but loaded version is 4.1.1
> * installing *source* package 'SparkR' ...
> ** R
> We have been running with 5.0.1 and haven't changed for a year.
> NOTE: Roxygen 6.x has some big changes and IMO we should not move to that yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21367) R older version of Roxygen2 on Jenkins

2019-05-13 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838771#comment-16838771
 ] 

Dongjoon Hyun commented on SPARK-21367:
---

Thank you, [~shaneknapp]!

> R older version of Roxygen2 on Jenkins
> --
>
> Key: SPARK-21367
> URL: https://issues.apache.org/jira/browse/SPARK-21367
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.3.0
>Reporter: Felix Cheung
>Assignee: shane knapp
>Priority: Major
> Attachments: R.paks
>
>
> Getting this message from a recent build.
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79461/console
> Warning messages:
> 1: In check_dep_version(pkg, version, compare) :
>   Need roxygen2 >= 5.0.0 but loaded version is 4.1.1
> 2: In check_dep_version(pkg, version, compare) :
>   Need roxygen2 >= 5.0.0 but loaded version is 4.1.1
> * installing *source* package 'SparkR' ...
> ** R
> We have been running with 5.0.1 and haven't changed for a year.
> NOTE: Roxygen 6.x has some big changes and IMO we should not move to that yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838766#comment-16838766
 ] 

Marcelo Vanzin commented on SPARK-27681:


bq. But, user code that passes a non-immutable Seq to a Spark API that accepts 
scala.Seq now will no longer compile in 2.13.

That's true, but isn't that what the Scala developers intend anyway? That will 
be true for any Scala code, not just Spark. That's assuming they didn't add any 
implicit conversion from a mutable Seq to an immutable one, which would solve 
that problem.

My problem with your suggestion is that now Spark developers will have to 
always remember to import that different {{Seq}} type. And if they don't 
remember, probably nothing will break until it's too late to notice. It's a 
counter-intuitive change for developers, and I in particular am not seeing a 
lot of benefits from it.

Here's an example of your proposed change that would break user code (just ran 
it on Scala 2.13-RC1):

{code}
scala> def foo(): scala.collection.Seq[String] = Nil
foo: ()scala.collection.Seq[String]

scala> val s: Seq[String] = foo()
   ^
   error: type mismatch;
found   : Seq[String] (in scala.collection)
required: Seq[String] (in scala.collection.immutable)
{code}

So, aside from Spark developers having to remember to use the different {{Seq}} 
type, user code might also have to change so that their internal APIs also use 
the different type, or things like the above may occur.

BTW I also checked and there's no automatic promotion from mutable to immutable 
seqs:

{code}
scala> val s: Seq[String] = scala.collection.mutable.ArrayBuffer[String]()
^
   error: type mismatch;
found   : scala.collection.mutable.ArrayBuffer[String]
required: Seq[String]
{code}

So I sort of understand your desire to keep things more similar, but I'm not 
really seeing the advantages you see.

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. In many cases, this will be 
> fine, as Spark users using Scala 2.13 will also have this changed alias. In 
> some cases it may be undesirable, as it will cause some code to compile in 
> 2.12 but not in 2.13. In some cases, making the type {{scala.collection.Seq}} 
> explicit so that it doesn't vary can help avoid this, so that Spark apps 
> might cross-compile for 2.12 and 2.13 with the same source.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-27681:
--
Description: {{scala.Seq}} is widely used in the code, and is an alias for 
{{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
{{scala.collection.immutable.Seq}} in Scala 2.13. In many cases, this will be 
fine, as Spark users using Scala 2.13 will also have this changed alias. In 
some cases it may be undesirable, as it will cause some code to compile in 2.12 
but not in 2.13. In some cases, making the type {{scala.collection.Seq}} 
explicit so that it doesn't vary can help avoid this, so that Spark apps might 
cross-compile for 2.12 and 2.13 with the same source.  (was: {{scala.Seq}} is 
widely used in the code, and is an alias for {{scala.collection.Seq}} in Scala 
2.12. It will become an alias for {{scala.collection.immutable.Seq}} in Scala 
2.13. To avoid API changes, we should simply explicit import and use 
{{scala.collection.Seq}}.)

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. In many cases, this will be 
> fine, as Spark users using Scala 2.13 will also have this changed alias. In 
> some cases it may be undesirable, as it will cause some code to compile in 
> 2.12 but not in 2.13. In some cases, making the type {{scala.collection.Seq}} 
> explicit so that it doesn't vary can help avoid this, so that Spark apps 
> might cross-compile for 2.12 and 2.13 with the same source.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838744#comment-16838744
 ] 

Sean Owen commented on SPARK-27681:
---

OK, take it down a notch; we are largely in agreement, but there's a little 
more to it.

Right, so if all Spark's {{scala.Seq}} in APIs switch their implementation, 
then so do callers' {{scala.Seq}}, so who cares? The problem is that Spark, and 
user code, _do_ sometimes use Seqs that aren't known to be immutable. Your 
point is specifically that {{def foo(): scala.Seq}} and {{val a: scala.Seq = 
foo()}} work fine in 2.12 and 2.13, but changing to {{def foo(): 
scala.collection.Seq}} works in 2.12 but not 2.13, unless you use the type more 
explicitly. Yeah, let's avoid that.

There's a mirror-image issue. A Spark method that returns a non-immutable Seq 
as {{scala.Seq}} works now but won't compile in 2.13. OK, but that's Spark's 
problem; we either change the Spark API or change its implementation, though I 
doubt it'll be possible in all cases.

But, user code that passes a non-immutable Seq to a Spark API that accepts 
{{scala.Seq}} now will no longer compile in 2.13. That is if {{def foo(a: 
scala.Seq)}}, it works with any Seq in 2.12 but fails to compile for 
{{scala.collection.Seq}} arguments in 2.13. Whereas {{def foo(a: 
scala.collection.Seq)}} continues to work in both.

Fixing the type to {{scala.collection.Seq}} isn't so much about making sure 
it's the same type in 2.12 and 2.13, but fixing this at a type that can accept 
any Seq as it does already, in 2.13. The cases that may have to change are the 
covariant or contravariant cases, I forget which is which. Definitely not all 
of them.

Which ones? I'd punt that to later, but I think it'll be method arguments that 
are Seqs and Spark methods whose return type don't compile otherwise in 2.13.

Oh, and the description definitely has to be updated, I'll do that.

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. To avoid API changes, we 
> should simply explicit import and use {{scala.collection.Seq}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27402) Fix hadoop-3.2 test issue(except the hive-thriftserver module)

2019-05-13 Thread Xiao Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-27402.
-
   Resolution: Fixed
 Assignee: Yuming Wang
Fix Version/s: 3.0.0

> Fix hadoop-3.2 test issue(except the hive-thriftserver module)
> --
>
> Key: SPARK-27402
> URL: https://issues.apache.org/jira/browse/SPARK-27402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Yuming Wang
>Priority: Major
> Fix For: 3.0.0
>
>
> Fix sql/core and sql/hive modules test issue for hadoop-3.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udafs

2019-05-13 Thread Michael Tong (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Tong updated SPARK-27691:
-
Description: 
Am currently running pyspark 2.4.2 and I am currently unable to run the 
following code.

 
{code:java}
from pyspark.sql import functions, types
import pandas as pd
import random

# initialize test data
test_data = [[False, int(random.random() * 2)] for i in range(1)]
test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

# pandas udaf
pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
functions.PandasUDFType.GROUPED_AGG)

# create spark DataFrame and build the query
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
test_df = test_df.filter(functions.col('bool_any_result') == True)

# write to output
test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
{code}
 

Below is a truncated error message.

 
{code:java}
Py4JJavaError: An error occurred while calling o1125.parquet. : 
org.apache.spark.SparkException: Job aborted.

...

Exchange hashpartitioning(int_value#123L, 2000)
+- *(1) Filter ((bool_value#122) = true)
   +- Scan ExistingRDD arrow[bool_value#122,int_value#123L]

...

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
(input[0, boolean, true]){code}
 

 

What appears to be happening is that the query optimizer incorrectly pushes up 
the filter predicate on bool_any_result before the group by operation. This 
causes the query to error out before spark attempts to execute the query. I 
have also tried running a variant of this query with functions.count() as the 
aggregation function and the query ran fine, so I believe that this is an error 
that only affects pandas udafs.

 

Variant of query with standard aggregation function
{code:java}
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
test_df = test_df.filter(functions.col('bool_counts') > 0)
{code}
 

 

 

  was:
Am currently running pyspark 2.4.2 and I am currently unable to run the 
following code.

 
{code:java}
from pyspark.sql import functions, types
import pandas as pd
import random

# initialize test data
test_data = [[False, int(random.random() * 2)] for i in range(1)]
test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

# pandas udf
pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
functions.PandasUDFType.GROUPED_AGG)

# create spark DataFrame and build the query
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
test_df = test_df.filter(functions.col('bool_any_result') == True)

# write to output
test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
{code}
 

Below is a truncated error message.

 
{code:java}
Py4JJavaError: An error occurred while calling o1125.parquet. : 
org.apache.spark.SparkException: Job aborted.

...

Exchange hashpartitioning(int_value#123L, 2000)
+- *(1) Filter ((bool_value#122) = true)
   +- Scan ExistingRDD arrow[bool_value#122,int_value#123L]

...

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
(input[0, boolean, true]){code}
 

 

What appears to be happening is that the query optimizer incorrectly pushes up 
the filter predicate on bool_any_result before the group by operation. This 
causes the query to error out before spark attempts to execute the query. I 
have also tried running a variant of this query with functions.count() as the 
aggregation function and the query ran fine, so I believe that this is an error 
that only affects pandas udfs.

 

Variant of query with standard aggregation function
{code:java}
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
test_df = test_df.filter(functions.col('bool_counts') > 0)
{code}
 

 

 


> Issue when running queries using filter predicates on pandas GROUPED_AGG udafs
> --
>
> Key: SPARK-27691
> URL: https://issues.apache.org/jira/browse/SPARK-27691
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.2
>Reporter: Michael Tong
>Priority: Major
>
> Am currently running pyspark 2.4.2 and I am currently unable to run the 
> following code.
>  
> {code:java}
> from pyspark.sql import functions, types
> import pandas as pd
> import random
> # initialize test data
> test_data = [[False, int(random.random() * 2)] for i in range(1)]
> test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

[jira] [Updated] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udafs

2019-05-13 Thread Michael Tong (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Tong updated SPARK-27691:
-
Summary: Issue when running queries using filter predicates on pandas 
GROUPED_AGG udafs  (was: Issue when running queries using filter predicates on 
pandas GROUPED_AGG udfs)

> Issue when running queries using filter predicates on pandas GROUPED_AGG udafs
> --
>
> Key: SPARK-27691
> URL: https://issues.apache.org/jira/browse/SPARK-27691
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.2
>Reporter: Michael Tong
>Priority: Major
>
> Am currently running pyspark 2.4.2 and I am currently unable to run the 
> following code.
>  
> {code:java}
> from pyspark.sql import functions, types
> import pandas as pd
> import random
> # initialize test data
> test_data = [[False, int(random.random() * 2)] for i in range(1)]
> test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])
> # pandas udf
> pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
> functions.PandasUDFType.GROUPED_AGG)
> # create spark DataFrame and build the query
> test_df = spark.createDataFrame(test_data)
> test_df = 
> test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
> test_df = test_df.filter(functions.col('bool_any_result') == True)
> # write to output
> test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
> {code}
>  
> Below is a truncated error message.
>  
> {code:java}
> Py4JJavaError: An error occurred while calling o1125.parquet. : 
> org.apache.spark.SparkException: Job aborted.
> ...
> Exchange hashpartitioning(int_value#123L, 2000)
> +- *(1) Filter ((bool_value#122) = true)
>+- Scan ExistingRDD arrow[bool_value#122,int_value#123L]
> ...
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate 
> expression: (input[0, boolean, true]){code}
>  
>  
> What appears to be happening is that the query optimizer incorrectly pushes 
> up the filter predicate on bool_any_result before the group by operation. 
> This causes the query to error out before spark attempts to execute the 
> query. I have also tried running a variant of this query with 
> functions.count() as the aggregation function and the query ran fine, so I 
> believe that this is an error that only affects pandas udfs.
>  
> Variant of query with standard aggregation function
> {code:java}
> test_df = spark.createDataFrame(test_data)
> test_df = 
> test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
> test_df = test_df.filter(functions.col('bool_counts') > 0)
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udfs

2019-05-13 Thread Michael Tong (JIRA)
Michael Tong created SPARK-27691:


 Summary: Issue when running queries using filter predicates on 
pandas GROUPED_AGG udfs
 Key: SPARK-27691
 URL: https://issues.apache.org/jira/browse/SPARK-27691
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.4.2
Reporter: Michael Tong


Am currently running pyspark 2.4.2 and I am currently unable to run the 
following code.

 
{code:java}
from pyspark.sql import functions, types
import pandas as pd
import random

# initialize test data
test_data = [[False, int(random.random() * 2)] for i in range(1)]
test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

# pandas udf
pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
functions.PandasUDFType.GROUPED_AGG)

# create spark DataFrame and build the query
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
test_df = test_df.filter(functions.col('bool_any_result') == True)

# write to output
test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
{code}
 

Below is a truncated error message.

 
{code:java}
Py4JJavaError: An error occurred while calling o1125.parquet. : 
org.apache.spark.SparkException: Job aborted.

...

Exchange hashpartitioning(int_value#123L, 2000)
+- *(1) Filter ((bool_value#122) = true)
   +- Scan ExistingRDD arrow[bool_value#122,int_value#123L]

...

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
(input[0, boolean, true]){code}
 

 

What appears to be happening is that the query optimizer incorrectly pushes up 
the filter predicate on bool_any_result before the group by operation. This 
causes the query to error out before spark attempts to execute the query. I 
have also tried running a variant of this query with functions.count() as the 
aggregation function and the query ran fine, so I believe that this is an error 
that only affects pandas udfs.

 

Variant of query with standard aggregation function
{code:java}
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
test_df = test_df.filter(functions.col('bool_counts') > 0)
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838715#comment-16838715
 ] 

Marcelo Vanzin commented on SPARK-27681:


bq. varargs methods in Scala will change their signature as they are 
implemented with Seq

Again, why is that a problem? That will only happen in the 2.13 version of 
Spark.There are absolutely no backwards binary compatibility guarantees in that 
case.

The 2.12 build will keep working just like it does today. 

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. To avoid API changes, we 
> should simply explicit import and use {{scala.collection.Seq}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27690) Refactor HiveClientImpl#reset() to remove materialized view first

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27690:


Assignee: Apache Spark

> Refactor HiveClientImpl#reset() to remove materialized view first 
> --
>
> Key: SPARK-27690
> URL: https://issues.apache.org/jira/browse/SPARK-27690
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>
> {noformat}
> Cause: 
> org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException:
>  DELETE on table 'TBLS' caused a violation of foreign key constraint 
> 'MV_TABLES_USED_FK2' for key (4).  The statement has been rolled back.
> at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeLargeBatch(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeBatch(Unknown 
> Source){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27690) Refactor HiveClientImpl#reset() to remove materialized view first

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27690:


Assignee: (was: Apache Spark)

> Refactor HiveClientImpl#reset() to remove materialized view first 
> --
>
> Key: SPARK-27690
> URL: https://issues.apache.org/jira/browse/SPARK-27690
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> {noformat}
> Cause: 
> org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException:
>  DELETE on table 'TBLS' caused a violation of foreign key constraint 
> 'MV_TABLES_USED_FK2' for key (4).  The statement has been rolled back.
> at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeLargeBatch(Unknown Source)
> at org.apache.derby.impl.jdbc.EmbedStatement.executeBatch(Unknown 
> Source){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27690) Refactor HiveClientImpl#reset() to remove materialized view first

2019-05-13 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-27690:
---

 Summary: Refactor HiveClientImpl#reset() to remove materialized 
view first 
 Key: SPARK-27690
 URL: https://issues.apache.org/jira/browse/SPARK-27690
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Yuming Wang


{noformat}
Cause: 
org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException:
 DELETE on table 'TBLS' caused a violation of foreign key constraint 
'MV_TABLES_USED_FK2' for key (4).  The statement has been rolled back.
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown 
Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
at 
org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
at 
org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedStatement.executeLargeBatch(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedStatement.executeBatch(Unknown 
Source){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27689) Error to execute hive views with spark

2019-05-13 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838696#comment-16838696
 ] 

Yuming Wang commented on SPARK-27689:
-

Thank you [~lambda] I will check it.

> Error to execute hive views with spark
> --
>
> Key: SPARK-27689
> URL: https://issues.apache.org/jira/browse/SPARK-27689
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.3, 2.4.3
>Reporter: Juan Antonio
>Priority: Critical
>
> I have a python error when I execute the following code using hive views but 
> it works correctly when I run it with hive tables.
> *Hive databases:*
> CREATE DATABASE schema_p LOCATION "hdfs:///tmp/schema_p";
> *Hive tables:*
> CREATE TABLE schema_p.person(
>  id_person string, 
>  identifier string, 
>  gender string, 
>  start_date string, 
>  end_date string)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION 'hdfs:///tmp/schema_p/person';
> CREATE TABLE schema_p.product(
>  id_product string,
>  name string,
>  country string,
>  city string,
>  start_date string,
>  end_date string
> )
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION 'hdfs:///tmp/schema_p/product';
> CREATE TABLE schema_p.person_product(
>  id_person string,
>  id_product string,
>  country string,
>  city string,
>  price string,
>  start_date string,
>  end_date string
> )
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION 'hdfs:///tmp/schema_p/person_product';
> *Hive views:*
> CREATE VIEW schema_p.person_v AS SELECT CAST(id_person AS INT) AS id_person, 
> CAST(identifier AS INT) AS identifier, gender AS gender, CAST(start_date AS 
> DATE) AS start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.person;
> CREATE VIEW schema_p.product_v AS SELECT CAST(id_product AS INT) AS 
> id_product, name AS name, country AS country, city AS city, CAST(start_date 
> AS DATE) AS start_date, CAST(end_date AS DATE) AS end_date FROM 
> schema_p.product;
> CREATE VIEW schema_p.person_product_v AS SELECT CAST(id_person AS INT) AS 
> id_person, CAST(id_product AS INT) AS id_product, country AS country, city AS 
> city, CAST(price AS DECIMAL(38,8)) AS price, CAST(start_date AS DATE) AS 
> start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.person_product;
> *
> *Code*:
> def read_tables(sc):
>  in_dict = {
>  'person': 'person_v',
>  'product': 'product_v',
>  'person_product': 'person_product_v'
>  }
>  data_dict = {}
>  for n, d in in_dict.iteritems():
>  data_dict[n] = sc.read.table(d)
>  return data_dict
> def get_population(tables, ref_date_str):
>  person = tables['person']
>  product = tables['product']
>  person_product = tables['person_product']
>  
>  person_product_join = person_product.join(product,'id_product')
>  count_prod = 
> person_product.groupBy('id_product').agg(F.count('id_product').alias('count_prod'))
>  
>  person_count = person_product_join.join(count_prod,'id_product')
>  final1 = person_product_join.join(person_count, 'id_person', 'left')
>  final = final1.withColumn('reference_date', F.lit(ref_date_str))
>  return final
> import pyspark.sql.functions as F
> import functools
> from pyspark.sql.functions import col
> from pyspark.sql.functions import add_months, lit, count, coalesce
> spark.sql('use schema_p')
> data_dict = read_tables(spark)
> data_dict
> population = get_population(data_dict, '2019-04-30')
> population.show()
> *
> *Error:*
> File "", line 1, in 
>  File "", line 10, in get_population
>  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 
> 931, in join
>  jdf = self._jdf.join(other._jdf, on, how)
>  File 
> "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
>  line 1160, in __call__
>  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, 
> in deco
>  raise AnalysisException(s.split(': ', 1)[1], stackTrace)
> pyspark.sql.utils.AnalysisException: u'Resolved attribute(s) 
> id_product#124,end_date#129,city#126,price#127,start_date#128,id_person#123,country#125
>  missing from 
> city#47,price#48,start_date#49,id_product#45,end_date#50,id_person#44,country#46
>  in operator !Project [cast(id_person#123 as int) AS id_person#96, 
> cast(id_product#124 

[jira] [Commented] (SPARK-27671) Analysis exception thrown when casting from a nested null in a struct

2019-05-13 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838692#comment-16838692
 ] 

Dongjoon Hyun commented on SPARK-27671:
---

Hi, [~viirya]. Could you test old Spark and update the `Affects Version/s` 
please?

> Analysis exception thrown when casting from a nested null in a struct
> -
>
> Key: SPARK-27671
> URL: https://issues.apache.org/jira/browse/SPARK-27671
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> When a null in a nested field in struct, casting from the struct throws 
> error, currently.
> {code}
> scala> sql("select cast(struct(1, null) as struct)").show
> scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447) 
>  
>   at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)  
>  
>   at 
> org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
>
> {code}
> {code}
> scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), 
> (('c', null)) AS tab(x, y)").show 
> org.apache.spark.sql.AnalysisException: failed to evaluate expression 
> named_struct('col1', 10, 'col2', NULL): NullType (of class 
> org.apache.spark.sql.t
> ypes.NullType$); line 1 pos 14
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27681) Use scala.collection.Seq explicitly instead of scala.Seq alias

2019-05-13 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838693#comment-16838693
 ] 

Marcelo Vanzin commented on SPARK-27681:


bq. scala.Seq is just a type def for scala.collection.Seq

In 2.12. If you force all the API in 2.13 to explicitly use 
{{scala.collection.Seq}}, all the existing user code that does not explicitly 
import that class will now be using a different type, and may run into issues.

Again, what I'm asking is why it is important for a method {{def foo(s: 
Seq[Any])}} to be exposed as {{def foo(s: scala.collection.Seq[Any])}} in both 
Scala 2.12 and 2.13, instead of using whatever is the default {{Seq}} alias for 
the respective Scala version.

I really don't know how to word my question any more clearly.

> Use scala.collection.Seq explicitly instead of scala.Seq alias
> --
>
> Key: SPARK-27681
> URL: https://issues.apache.org/jira/browse/SPARK-27681
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, MLlib, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> {{scala.Seq}} is widely used in the code, and is an alias for 
> {{scala.collection.Seq}} in Scala 2.12. It will become an alias for 
> {{scala.collection.immutable.Seq}} in Scala 2.13. To avoid API changes, we 
> should simply explicit import and use {{scala.collection.Seq}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-27617) Not able to specify LOCATION for internal table

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun closed SPARK-27617.
-

> Not able to specify LOCATION for internal table
> ---
>
> Key: SPARK-27617
> URL: https://issues.apache.org/jira/browse/SPARK-27617
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0, 2.2.0, 2.3.0, 2.4.0, 3.0.0
>Reporter: Sujith Chacko
>Priority: Major
>
> In Spark whenever user specifies location uri in create table without 
> external keyword the table is treated as external table . 
> Because of this behavior following following problems has been observed
> a) user will not able to set an external location for a managed table.
> b) compatibility issue with hive/impala where the system allow managed table 
> to specify location uri if user created table without 'EXTERNAL' keyword.
> {code:java}
> scala> spark.sql("""CREATE TABLE IF NOT EXISTS ext2 (name STRING) LOCATION 
> 'D:/spark-2.4.1-bin-hadoop2.7/bin/spark-warehouse/abc_orc13'""");
>  -chgrp: 'HTIPL-23270\None' does not match expected pattern for group
>  Usage: hadoop fs [generic options] -chgrp [-R] GROUP PATH...
>  res15: org.apache.spark.sql.DataFrame = []
> scala> spark.sql("desc formatted ext2").show(false)
>  
> +-+---++---
> |col_name|data_type|comment|
> +-+---++---
> |name|string|null|
> | | | |
> | # Detailed Table Information| | |
> |Database|default| |
> |Table|ext2| |
> |Owner|Administrator| |
> |Created Time|Wed May 01 21:52:57 IST 2019| |
> |Last Access|Thu Jan 01 05:30:00 IST 1970| |
> |Created By|Spark 2.4.1| |
> |Type|EXTERNAL| |
> |Provider|hive| |
> |Table Properties|[transient_lastDdlTime=155672]| |
> |Location|file:/D:/spark-2.4.1-bin-hadoop2.7/bin/spark-warehouse/abc_orc13| |
> |Serde Library|org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe| |
> |InputFormat|org.apache.hadoop.mapred.TextInputFormat| |
> |OutputFormat|org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat| |
> |Storage Properties|[serialization.format=1]| |
> |Partition Provider|Catalog| |
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27617) Not able to specify LOCATION for internal table

2019-05-13 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-27617.
---
Resolution: Invalid

> Not able to specify LOCATION for internal table
> ---
>
> Key: SPARK-27617
> URL: https://issues.apache.org/jira/browse/SPARK-27617
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0, 2.2.0, 2.3.0, 2.4.0, 3.0.0
>Reporter: Sujith Chacko
>Priority: Major
>
> In Spark whenever user specifies location uri in create table without 
> external keyword the table is treated as external table . 
> Because of this behavior following following problems has been observed
> a) user will not able to set an external location for a managed table.
> b) compatibility issue with hive/impala where the system allow managed table 
> to specify location uri if user created table without 'EXTERNAL' keyword.
> {code:java}
> scala> spark.sql("""CREATE TABLE IF NOT EXISTS ext2 (name STRING) LOCATION 
> 'D:/spark-2.4.1-bin-hadoop2.7/bin/spark-warehouse/abc_orc13'""");
>  -chgrp: 'HTIPL-23270\None' does not match expected pattern for group
>  Usage: hadoop fs [generic options] -chgrp [-R] GROUP PATH...
>  res15: org.apache.spark.sql.DataFrame = []
> scala> spark.sql("desc formatted ext2").show(false)
>  
> +-+---++---
> |col_name|data_type|comment|
> +-+---++---
> |name|string|null|
> | | | |
> | # Detailed Table Information| | |
> |Database|default| |
> |Table|ext2| |
> |Owner|Administrator| |
> |Created Time|Wed May 01 21:52:57 IST 2019| |
> |Last Access|Thu Jan 01 05:30:00 IST 1970| |
> |Created By|Spark 2.4.1| |
> |Type|EXTERNAL| |
> |Provider|hive| |
> |Table Properties|[transient_lastDdlTime=155672]| |
> |Location|file:/D:/spark-2.4.1-bin-hadoop2.7/bin/spark-warehouse/abc_orc13| |
> |Serde Library|org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe| |
> |InputFormat|org.apache.hadoop.mapred.TextInputFormat| |
> |OutputFormat|org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat| |
> |Storage Properties|[serialization.format=1]| |
> |Partition Provider|Catalog| |
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27653) Add max_by() / min_by() SQL aggregate functions

2019-05-13 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-27653:
---

Assignee: Liang-Chi Hsieh

> Add max_by() / min_by() SQL aggregate functions
> ---
>
> Key: SPARK-27653
> URL: https://issues.apache.org/jira/browse/SPARK-27653
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Josh Rosen
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 3.0.0
>
>
> It would be useful if Spark SQL supported the {{max_by()}} SQL aggregate 
> function. Quoting from the [Presto 
> docs|https://prestodb.github.io/docs/current/functions/aggregate.html#max_by]:
> {quote}max_by(x, y) → [same as x]
>  Returns the value of x associated with the maximum value of y over all input 
> values.
> {quote}
> {{min_by}} works similarly.
> Technically I can emulate this behavior using window functions but the 
> resulting syntax is much more verbose and non-intuitive compared to 
> {{max_by}} / {{min_by}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27683) Remove usage of TraversableOnce

2019-05-13 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838582#comment-16838582
 ] 

Sean Owen commented on SPARK-27683:
---

Yeah if it comes to defining two source trees we can do that; was just trying 
to avoid it. I think defining a type alias or using compat all involves two 
source trees in some way.

Oh, 2.13 aliased TraversableOnce to IterableOnce? well, then that would do it 
for us I guess.

If the alternative is for us to change the signature to accept {{T => 
Iterator[U]}}, then I wouldn't do that, and would just let TraversableOnce use 
the alias in 2.13.

> Remove usage of TraversableOnce
> ---
>
> Key: SPARK-27683
> URL: https://issues.apache.org/jira/browse/SPARK-27683
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> As with {{Traversable}}, {{TraversableOnce}} is going away in Scala 2.13. We 
> should use {{IterableOnce}} instead. This one is a bigger change as there are 
> more API methods with the existing signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27653) Add max_by() / min_by() SQL aggregate functions

2019-05-13 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-27653.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24557
[https://github.com/apache/spark/pull/24557]

> Add max_by() / min_by() SQL aggregate functions
> ---
>
> Key: SPARK-27653
> URL: https://issues.apache.org/jira/browse/SPARK-27653
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Josh Rosen
>Priority: Major
> Fix For: 3.0.0
>
>
> It would be useful if Spark SQL supported the {{max_by()}} SQL aggregate 
> function. Quoting from the [Presto 
> docs|https://prestodb.github.io/docs/current/functions/aggregate.html#max_by]:
> {quote}max_by(x, y) → [same as x]
>  Returns the value of x associated with the maximum value of y over all input 
> values.
> {quote}
> {{min_by}} works similarly.
> Technically I can emulate this behavior using window functions but the 
> resulting syntax is much more verbose and non-intuitive compared to 
> {{max_by}} / {{min_by}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838558#comment-16838558
 ] 

Jungtaek Lim commented on SPARK-27648:
--

Exposing small part again and again gives more confusing - given there's no 
clear symptom of actual issue here, this is better to be asked in user mailing 
list.

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27683) Remove usage of TraversableOnce

2019-05-13 Thread Lukas Rytz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838548#comment-16838548
 ] 

Lukas Rytz commented on SPARK-27683:


bq. This makes it hard to create an API method that works in both

Do you see a problem, in genral, if the 2.12 flatMap takes a {{T => 
TraversableOnce[U]}}, while the 2.13 API takes a {{T => IterableOnce[U]}}? I 
think this should be the preferred solution. It might seems simpler if both 
2.12 and 2.13 use the "same" type, e.g. {{scala.collection.Iterable}}, but in 
reality they are not the same either, due to the collections rewrite, and due 
to major-version-incompatibilities in general. And it would change / restrict 
the API for no good reason.

Using {{TraversableOnce}} / {{IterableOnce}} can be achieved either with a type 
alias, or by having separate source files for 2.12 and 2.13. As you mention, 
there's already the {{scala.collection.compat.IterableOnce}} alias in the 
compat library. Alternatively you can use the deprecated 
{{scala.collection.TraversableOnce}} alias that exists in the 2.13 standard 
library. Or you can define your own.

As far as I can see, the disadvantage of using an alias is that it shows up in 
Scaladocs, the API, and in source code. So it's an indirection that users have 
to go through when looking at the API. Version-specific source files would 
avoid that.


> Remove usage of TraversableOnce
> ---
>
> Key: SPARK-27683
> URL: https://issues.apache.org/jira/browse/SPARK-27683
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> As with {{Traversable}}, {{TraversableOnce}} is going away in Scala 2.13. We 
> should use {{IterableOnce}} instead. This one is a bigger change as there are 
> more API methods with the existing signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27689) Error to execute hive views with spark

2019-05-13 Thread Juan Antonio (JIRA)
Juan Antonio created SPARK-27689:


 Summary: Error to execute hive views with spark
 Key: SPARK-27689
 URL: https://issues.apache.org/jira/browse/SPARK-27689
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.3, 2.3.3, 2.3.0
Reporter: Juan Antonio


I have a python error when I execute the following code using hive views but it 
works correctly when I run it with hive tables.

*Hive databases:*

CREATE DATABASE schema_p LOCATION "hdfs:///tmp/schema_p";

*Hive tables:*

CREATE TABLE schema_p.person(
 id_person string, 
 identifier string, 
 gender string, 
 start_date string, 
 end_date string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'hdfs:///tmp/schema_p/person';

CREATE TABLE schema_p.product(
 id_product string,
 name string,
 country string,
 city string,
 start_date string,
 end_date string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'hdfs:///tmp/schema_p/product';

CREATE TABLE schema_p.person_product(
 id_person string,
 id_product string,
 country string,
 city string,
 price string,
 start_date string,
 end_date string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'hdfs:///tmp/schema_p/person_product';

*Hive views:*

CREATE VIEW schema_p.person_v AS SELECT CAST(id_person AS INT) AS id_person, 
CAST(identifier AS INT) AS identifier, gender AS gender, CAST(start_date AS 
DATE) AS start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.person;
CREATE VIEW schema_p.product_v AS SELECT CAST(id_product AS INT) AS id_product, 
name AS name, country AS country, city AS city, CAST(start_date AS DATE) AS 
start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.product;
CREATE VIEW schema_p.person_product_v AS SELECT CAST(id_person AS INT) AS 
id_person, CAST(id_product AS INT) AS id_product, country AS country, city AS 
city, CAST(price AS DECIMAL(38,8)) AS price, CAST(start_date AS DATE) AS 
start_date, CAST(end_date AS DATE) AS end_date FROM schema_p.person_product;

*

*Code*:

def read_tables(sc):
 in_dict = {
 'person': 'person_v',
 'product': 'product_v',
 'person_product': 'person_product_v'
 }
 data_dict = {}
 for n, d in in_dict.iteritems():
 data_dict[n] = sc.read.table(d)
 return data_dict

def get_population(tables, ref_date_str):
 person = tables['person']
 product = tables['product']
 person_product = tables['person_product']
 
 person_product_join = person_product.join(product,'id_product')
 count_prod = 
person_product.groupBy('id_product').agg(F.count('id_product').alias('count_prod'))
 
 person_count = person_product_join.join(count_prod,'id_product')
 final1 = person_product_join.join(person_count, 'id_person', 'left')
 final = final1.withColumn('reference_date', F.lit(ref_date_str))
 return final

import pyspark.sql.functions as F
import functools
from pyspark.sql.functions import col
from pyspark.sql.functions import add_months, lit, count, coalesce

spark.sql('use schema_p')
data_dict = read_tables(spark)
data_dict

population = get_population(data_dict, '2019-04-30')
population.show()

*

*Error:*

File "", line 1, in 
 File "", line 10, in get_population
 File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 
931, in join
 jdf = self._jdf.join(other._jdf, on, how)
 File 
"/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
 line 1160, in __call__
 File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in 
deco
 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Resolved attribute(s) 
id_product#124,end_date#129,city#126,price#127,start_date#128,id_person#123,country#125
 missing from 
city#47,price#48,start_date#49,id_product#45,end_date#50,id_person#44,country#46
 in operator !Project [cast(id_person#123 as int) AS id_person#96, 
cast(id_product#124 as int) AS id_product#97, cast(country#125 as string) AS 
country#98, cast(city#126 as string) AS city#99, cast(price#127 as 
decimal(38,8)) AS price#100, cast(start_date#128 as date) AS start_date#101, 
cast(end_date#129 as date) AS end_date#102]. Attribute(s) with the same name 
appear in the operation: 
id_product,end_date,city,price,start_date,id_person,country. Please check if 
the right attribute(s) are used.;;
Project [id_person#44, 

[jira] [Commented] (SPARK-27683) Remove usage of TraversableOnce

2019-05-13 Thread Stefan Zeiger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838546#comment-16838546
 ] 

Stefan Zeiger commented on SPARK-27683:
---

You could create your own type alias in a conditional source file. This should 
not affect binary compatibility. It's a lightweight alternative to 
scala-collections-compat but it has to live with the same restrictions. In 
particular, you won't be able to put the alias into `scala.collection` because 
it has to go into a package object (which already exists in the standard 
library).

Or you could continue using `TraversableOnce` for now. How many different Scala 
versions do you intend to support? I don't see any reason why we would have to 
remove it in 2.14, we can't remove it in 3.0, either, because it is supposed to 
keep compatibility with 2.14, so the `TraversableOnce` alias shouldn't go away 
before Scala 3.1 at the earliest.

> Remove usage of TraversableOnce
> ---
>
> Key: SPARK-27683
> URL: https://issues.apache.org/jira/browse/SPARK-27683
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> As with {{Traversable}}, {{TraversableOnce}} is going away in Scala 2.13. We 
> should use {{IterableOnce}} instead. This one is a bigger change as there are 
> more API methods with the existing signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread tommy duan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838540#comment-16838540
 ] 

tommy duan edited comment on SPARK-27648 at 5/13/19 1:36 PM:
-

Hi [~kabhwan]&[~gsomogyi],

No,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
 Dataset dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
 writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
 writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#ff}dropDpulKeepFirst{color}* and 
*{color:#ff}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset dropDuplKeepFirst(SparkSession sparkSession, Dataset 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
 return dsRows;
   }
   Dataset dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

The function 
{code:java}
procSrcDataAfterDropDupl{code}
 and 
{code:java}
procSrcDataAfterDropDupl{code}
do nothing。

 


was (Author: yy3b2007com):
Hi [~kabhwan]&[~gsomogyi],

Yes,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
 Dataset dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
 writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
 writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#ff}dropDpulKeepFirst{color}* and 
*{color:#ff}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset dropDuplKeepFirst(SparkSession sparkSession, Dataset 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
 return dsRows;
   }
   Dataset dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

The function 
{code:java}
procSrcDataAfterDropDupl{code}
 and 
{code:java}
procSrcDataAfterDropDupl{code}
do nothing。

 

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger 

[jira] [Comment Edited] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread tommy duan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838540#comment-16838540
 ] 

tommy duan edited comment on SPARK-27648 at 5/13/19 1:34 PM:
-

Hi [~kabhwan]&[~gsomogyi],

Yes,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
 Dataset dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
 writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
 writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#ff}dropDpulKeepFirst{color}* and 
*{color:#ff}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset dropDuplKeepFirst(SparkSession sparkSession, Dataset 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
 return dsRows;
   }
   Dataset dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

The function 
{code:java}
procSrcDataAfterDropDupl{code}
 and 
{code:java}
procSrcDataAfterDropDupl{code}
do nothing。

 


was (Author: yy3b2007com):
Hi [~kabhwan]&[~gsomogyi],

Yes,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
 Dataset dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
 writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
 writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#FF}dropDpulKeepFirst{color}* and 
*{color:#FF}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset dropDuplKeepFirst(SparkSession sparkSession, Dataset 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
 return dsRows;
   }
   Dataset dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

 

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. 

[jira] [Commented] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread tommy duan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838540#comment-16838540
 ] 

tommy duan commented on SPARK-27648:


Hi [~kabhwan]&[~gsomogyi],

Yes,the code include one sink;

 
{code:java}
public void start(SparkSession sparkSession) throws Exception {
  init(sparkSession);
   Dataset dsSourceData = loadSourceData(sparkSession); // dsSourceData
   Dataset dsPreDropDupl = procSrcDataPreDropDupl(dsSourceData);
   Dataset dsDropDuplicates = dropDuplicates(sparkSession, dsPreDropDupl);
   Dataset dsAfterDropDupl = procSrcDataAfterDropDupl(dsDropDuplicates);
   if (this._DropDuplType.equals("keepfirst")||this._DropDuplType.equals("no")) 
{
 Dataset dsAgg = executeAgg(sparkSession, dsAfterDropDupl);
 writeResult(sparkSession, dsAgg);
   }
   if (this._DropDuplType.equals("keeplast")) {
 writeResult(sparkSession, dsAfterDropDupl);
   }
 }{code}
 

The property "_DropDuplTypes" value equals "keepfirst",So the sink just only 
one。

The state operator include tow *{color:#FF}dropDpulKeepFirst{color}* and 
*{color:#FF}executeAgg{color}* 

*First state operator* is read the topic of KAFKA source and do duplication 
with it,then sink

 
{code:java}
private Dataset dropDuplKeepFirst(SparkSession sparkSession, Dataset 
dsRows, String watermarkMinutes,
  String queryName) throws AnalysisException {
   if (watermarkMinutes.equals("0")) {
 return dsRows;
   }
   Dataset dsDropDuplicates = dsRows.withWatermark("timestamp", 
watermarkMinutes + " minutes");
   dsDropDuplicates = dsDropDuplicates.dropDuplicates("int_id", "timestamp");
   return dsDropDuplicates;
 }{code}
 

*Second state operator* is just agg。

 

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message 

[jira] [Commented] (SPARK-27688) Beeline should show database in the prompt

2019-05-13 Thread Sandeep Katta (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838501#comment-16838501
 ] 

Sandeep Katta commented on SPARK-27688:
---

okay will work on your fork and raise PR to your fork

> Beeline should show database in the prompt
> --
>
> Key: SPARK-27688
> URL: https://issues.apache.org/jira/browse/SPARK-27688
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Sandeep Katta
>Priority: Minor
>
> Since [HIVE-14123|https://issues.apache.org/jira/browse/HIVE-14123] supports 
> display of database in beeline. Spark should also support this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27688) Beeline should show database in the prompt

2019-05-13 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838499#comment-16838499
 ] 

Yuming Wang edited comment on SPARK-27688 at 5/13/19 12:18 PM:
---

Sorry [~sandeep.katta2007]. We do not need do any changes once upgrade the 
built-in Hive to 2.3. You could test it here: 
[https://github.com/wangyum/spark/tree/hadoop-3.1-on-jenkins]


was (Author: q79969786):
Sorry [~sandeep.katta2007]. We do not need do any changes once uprade the 
built-in Hive to 2.3. You could test it here: 
[https://github.com/wangyum/spark/tree/hadoop-3.1-on-jenkins]

> Beeline should show database in the prompt
> --
>
> Key: SPARK-27688
> URL: https://issues.apache.org/jira/browse/SPARK-27688
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Sandeep Katta
>Priority: Minor
>
> Since [HIVE-14123|https://issues.apache.org/jira/browse/HIVE-14123] supports 
> display of database in beeline. Spark should also support this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27688) Beeline should show database in the prompt

2019-05-13 Thread Yuming Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838499#comment-16838499
 ] 

Yuming Wang commented on SPARK-27688:
-

Sorry [~sandeep.katta2007]. We do not need do any changes once uprade the 
built-in Hive to 2.3. You could test it here: 
[https://github.com/wangyum/spark/tree/hadoop-3.1-on-jenkins]

> Beeline should show database in the prompt
> --
>
> Key: SPARK-27688
> URL: https://issues.apache.org/jira/browse/SPARK-27688
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Sandeep Katta
>Priority: Minor
>
> Since [HIVE-14123|https://issues.apache.org/jira/browse/HIVE-14123] supports 
> display of database in beeline. Spark should also support this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-05-13 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26601.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23670
[https://github.com/apache/spark/pull/23670]

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Assignee: zhoukang
>Priority: Minor
> Fix For: 3.0.0
>
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM. In such case,we need to make this thread pool 
> configurable.
> Below is an example:
>  !26601-occupy.png! 
>  !26601-largeobject.png! 
>  !26601-path2gcroot.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26601) Make broadcast-exchange thread pool keepalivetime and maxThreadNumber configurable

2019-05-13 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-26601:


Assignee: zhoukang

> Make broadcast-exchange thread pool keepalivetime and maxThreadNumber 
> configurable
> --
>
> Key: SPARK-26601
> URL: https://issues.apache.org/jira/browse/SPARK-26601
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: zhoukang
>Assignee: zhoukang
>Priority: Minor
> Attachments: 26601-largeobject.png, 26601-occupy.png, 
> 26601-path2gcroot.png
>
>
> Currently,thread number of broadcast-exchange thread pool is fixed and 
> keepAliveSeconds is also fixed as 60s.
> {code:java}
> object BroadcastExchangeExec {
>   private[execution] val executionContext = 
> ExecutionContext.fromExecutorService(
> ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
> }
>  /**
>* Create a cached thread pool whose max number of threads is 
> `maxThreadNumber`. Thread names
>* are formatted as prefix-ID, where ID is a unique, sequentially assigned 
> integer.
>*/
>   def newDaemonCachedThreadPool(
>   prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
> ThreadPoolExecutor = {
> val threadFactory = namedThreadFactory(prefix)
> val threadPool = new ThreadPoolExecutor(
>   maxThreadNumber, // corePoolSize: the max number of threads to create 
> before queuing the tasks
>   maxThreadNumber, // maximumPoolSize: because we use 
> LinkedBlockingDeque, this one is not used
>   keepAliveSeconds,
>   TimeUnit.SECONDS,
>   new LinkedBlockingQueue[Runnable],
>   threadFactory)
> threadPool.allowCoreThreadTimeOut(true)
> threadPool
>   }
> {code}
> But some times, if the Thead object do not GC quickly it may caused 
> server(driver) OOM. In such case,we need to make this thread pool 
> configurable.
> Below is an example:
>  !26601-occupy.png! 
>  !26601-largeobject.png! 
>  !26601-path2gcroot.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27688) Beeline should show database in the prompt

2019-05-13 Thread Sandeep Katta (JIRA)
Sandeep Katta created SPARK-27688:
-

 Summary: Beeline should show database in the prompt
 Key: SPARK-27688
 URL: https://issues.apache.org/jira/browse/SPARK-27688
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.4.3
Reporter: Sandeep Katta


Since [HIVE-14123|https://issues.apache.org/jira/browse/HIVE-14123] supports 
display of database in beeline. Spark should also support this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27688) Beeline should show database in the prompt

2019-05-13 Thread Sandeep Katta (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838460#comment-16838460
 ] 

Sandeep Katta commented on SPARK-27688:
---

I will be working on this and raise PR soon

> Beeline should show database in the prompt
> --
>
> Key: SPARK-27688
> URL: https://issues.apache.org/jira/browse/SPARK-27688
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Sandeep Katta
>Priority: Minor
>
> Since [HIVE-14123|https://issues.apache.org/jira/browse/HIVE-14123] supports 
> display of database in beeline. Spark should also support this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27687) Kafka consumer cache parameter rename and documentation

2019-05-13 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated SPARK-27687:
--
Component/s: Documentation

> Kafka consumer cache parameter rename and documentation
> ---
>
> Key: SPARK-27687
> URL: https://issues.apache.org/jira/browse/SPARK-27687
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21827) Task fail due to executor exception when enable Sasl Encryption

2019-05-13 Thread JIRA


[ 
https://issues.apache.org/jira/browse/SPARK-21827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838421#comment-16838421
 ] 

Sébastien BARNOUD commented on SPARK-21827:
---

Hi,

We already use for a while and with huge volume SASL with Kafka. I just have a 
look on Kafka implementation:

[https://github.com/apache/kafka/blob/6ca899e56d451eef04e81b0f4d88bdb10f3cf4b3/clients/src/main/java/org/apache/kafka/common/network/Selector.java]

The KafkaChannel (including the SaslClient) is managed by this class that is 
clearly documented as NOT thread safe. That is probably the reason why we never 
noticed issue with Kafka and SASL.

 

> Task fail due to executor exception when enable Sasl Encryption
> ---
>
> Key: SPARK-21827
> URL: https://issues.apache.org/jira/browse/SPARK-21827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.6.1, 2.1.1, 2.2.0
> Environment: OS: RedHat 7.1 64bit
>Reporter: Yishan Jiang
>Priority: Major
>
> We met authentication and Sasl encryption on many versions, just append 161 
> version like this:
> spark.local.dir /tmp/test-161
> spark.shuffle.service.enabled true
> *spark.authenticate true*
> *spark.authenticate.enableSaslEncryption true*
> *spark.network.sasl.serverAlwaysEncrypt true*
> spark.authenticate.secret e25d4369-bec3-4266-8fc5-fb6d4fcee66f
> spark.history.ui.port 18089
> spark.shuffle.service.port 7347
> spark.master.rest.port 6076
> spark.deploy.recoveryMode NONE
> spark.ssl.enabled true
> spark.executor.extraJavaOptions -Djava.security.egd=file:/dev/./urandom
> We run an Spark example and task fail with Exception messages:
> 17/08/22 03:56:52 INFO BlockManager: external shuffle service port = 7347
> 17/08/22 03:56:52 INFO BlockManagerMaster: Trying to register BlockManager
> 17/08/22 03:56:52 INFO sasl: DIGEST41:Unmatched MACs
> 17/08/22 03:56:52 WARN TransportChannelHandler: Exception in connection from 
> cws57n6.ma.platformlab.ibm.com/172.29.8.66:49394
> java.lang.IllegalArgumentException: Frame length should be positive: 
> -5594407078713290673   
> at 
> org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:135)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:785)
> 17/08/22 03:56:52 ERROR TransportResponseHandler: Still have 1 requests 
> outstanding when connection from 
> cws57n6.ma.platformlab.ibm.com/172.29.8.66:49394 is closed
> 17/08/22 03:56:52 WARN NettyRpcEndpointRef: Error sending message [message = 
> RegisterBlockManager(BlockManagerId(fe9d31da-f70c-40a2-9032-05a5af4ba4c5, 
> cws58n1.ma.platformlab.ibm.com, 45852),2985295872,NettyRpcEn
> dpointRef(null))] in 1 attempts
> java.lang.IllegalArgumentException: Frame length should be positive: 
> -5594407078713290673
> at 
> org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:135)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 

[jira] [Commented] (SPARK-21827) Task fail due to executor exception when enable Sasl Encryption

2019-05-13 Thread JIRA


[ 
https://issues.apache.org/jira/browse/SPARK-21827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838396#comment-16838396
 ] 

Sébastien BARNOUD commented on SPARK-21827:
---

Hi,

I don't have the exact reason, but in a Spark job with 9 executor.cores and 
HBase Client 2.1.4:

-) with hbase.client.ipc.pool.type=RoundRobin (the default), i get frequently 
the issue (*DIGEST41:Unmatched MACs*)

-) with hbase.client.ipc.pool.type=ThreadLocal, i never get it ...

Oracle confirm me that the class SaslClient is not documented as thread safe, 
and that the application should take care about it.

Hoping this may help.

> Task fail due to executor exception when enable Sasl Encryption
> ---
>
> Key: SPARK-21827
> URL: https://issues.apache.org/jira/browse/SPARK-21827
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.6.1, 2.1.1, 2.2.0
> Environment: OS: RedHat 7.1 64bit
>Reporter: Yishan Jiang
>Priority: Major
>
> We met authentication and Sasl encryption on many versions, just append 161 
> version like this:
> spark.local.dir /tmp/test-161
> spark.shuffle.service.enabled true
> *spark.authenticate true*
> *spark.authenticate.enableSaslEncryption true*
> *spark.network.sasl.serverAlwaysEncrypt true*
> spark.authenticate.secret e25d4369-bec3-4266-8fc5-fb6d4fcee66f
> spark.history.ui.port 18089
> spark.shuffle.service.port 7347
> spark.master.rest.port 6076
> spark.deploy.recoveryMode NONE
> spark.ssl.enabled true
> spark.executor.extraJavaOptions -Djava.security.egd=file:/dev/./urandom
> We run an Spark example and task fail with Exception messages:
> 17/08/22 03:56:52 INFO BlockManager: external shuffle service port = 7347
> 17/08/22 03:56:52 INFO BlockManagerMaster: Trying to register BlockManager
> 17/08/22 03:56:52 INFO sasl: DIGEST41:Unmatched MACs
> 17/08/22 03:56:52 WARN TransportChannelHandler: Exception in connection from 
> cws57n6.ma.platformlab.ibm.com/172.29.8.66:49394
> java.lang.IllegalArgumentException: Frame length should be positive: 
> -5594407078713290673   
> at 
> org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:135)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:785)
> 17/08/22 03:56:52 ERROR TransportResponseHandler: Still have 1 requests 
> outstanding when connection from 
> cws57n6.ma.platformlab.ibm.com/172.29.8.66:49394 is closed
> 17/08/22 03:56:52 WARN NettyRpcEndpointRef: Error sending message [message = 
> RegisterBlockManager(BlockManagerId(fe9d31da-f70c-40a2-9032-05a5af4ba4c5, 
> cws58n1.ma.platformlab.ibm.com, 45852),2985295872,NettyRpcEn
> dpointRef(null))] in 1 attempts
> java.lang.IllegalArgumentException: Frame length should be positive: 
> -5594407078713290673
> at 
> org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:135)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> 

[jira] [Assigned] (SPARK-27687) Kafka consumer cache parameter rename and documentation

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27687:


Assignee: (was: Apache Spark)

> Kafka consumer cache parameter rename and documentation
> ---
>
> Key: SPARK-27687
> URL: https://issues.apache.org/jira/browse/SPARK-27687
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27687) Kafka consumer cache parameter rename and documentation

2019-05-13 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27687:


Assignee: Apache Spark

> Kafka consumer cache parameter rename and documentation
> ---
>
> Key: SPARK-27687
> URL: https://issues.apache.org/jira/browse/SPARK-27687
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Assignee: Apache Spark
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27687) Kafka consumer cache parameter rename and documentation

2019-05-13 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created SPARK-27687:
-

 Summary: Kafka consumer cache parameter rename and documentation
 Key: SPARK-27687
 URL: https://issues.apache.org/jira/browse/SPARK-27687
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Gabor Somogyi






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838337#comment-16838337
 ] 

Jungtaek Lim commented on SPARK-27648:
--

[~yy3b2007com]

I feel you're exposing only part of your query, as according to your query the 
state operator should be only one but the query information shows the 
information of two state operators. Structured streaming doesn't support 
multiple level of aggregations so except stream-stream join, most of cases 
there's one state operator.

And your graph for state doesn't consider adding two state operators. It seems 
only take smaller one. Here's state size (estimation) from batch 580:

11367828611 = 10.58711541 GiB
448870007 = 0.418042771 GiB

It was 11 GiB instead of less than 500 MiB in your graph.
{noformat}
Query made progress: {
"id" : "954e29ff-361c-43c9-8e8e-8221baaa544d",
"runId" : "907471bd-393f-46eb-a70c-2679463668a1",
"name" : "queryMyHourAgg",
"timestamp" : "2019-04-29T05:14:09.353Z",
"batchId" : 580,
"numInputRows" : 242468,
"inputRowsPerSecond" : 268.5174110840897,
"processedRowsPerSecond" : 267.4765196326098,
"durationMs" : {
"addBatch" : 877717,
"getBatch" : 1,
"getEndOffset" : 0,
"queryPlanning" : 25483,
"setOffsetRange" : 3082,
"triggerExecution" : 906502,
"walCommit" : 113
},
"eventTime" : {
"avg" : "2019-04-29T05:15:31.598Z",
"max" : "2019-04-29T05:30:00.000Z",
"min" : "2019-04-29T05:00:00.000Z",
"watermark" : "2019-04-29T03:35:00.000Z"
},
"stateOperators" : [ {
"numRowsTotal" : 725830,
"numRowsUpdated" : 240062,
"memoryUsedBytes" : 11367828611,
"customMetrics" : {
"loadedMapCacheHitCount" : 914082,
"loadedMapCacheMissCount" : 196,
"stateOnCurrentVersionSizeBytes" : 8740427635
}
}, {
"numRowsTotal" : 1701992,
"numRowsUpdated" : 242182,
"memoryUsedBytes" : 448870007,
"customMetrics" : {
"loadedMapCacheHitCount" : 502667,
"loadedMapCacheMissCount" : 69,
"stateOnCurrentVersionSizeBytes" : 340077583
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[MY-Q1]]",
"startOffset" : {
"MY-Q1" : {
"17" : 163557365,
"8" : 162352319,
"11" : 161504220,
"2" : 162584734,
"5" : 163017552,
"14" : 165254414,
"13" : 163239536,
"4" : 163848418,
"16" : 16381,
"7" : 16368,
"1" : 165680313,
"10" : 163337267,
"19" : 160421019,
"18" : 161986305,
"9" : 163245093,
"3" : 161627253,
"12" : 162961867,
"15" : 161854220,
"6" : 162547056,
"0" : 161467759
}
},
"endOffset" : {
"MY-Q1" : {
"17" : 163569546,
"8" : 162364385,
"11" : 161516235,
"2" : 162596785,
"5" : 163029700,
"14" : 165266681,
"13" : 163251648,
"4" : 163860634,
"16" : 163345529,
"7" : 163699952,
"1" : 165692663,
"10" : 163349403,
"19" : 160432998,
"18" : 161998399,
"9" : 163257234,
"3" : 161639259,
"12" : 162974030,
"15" : 161866224,
"6" : 162559232,
"0" : 161479799
}
},
"numInputRows" : 242468,
"inputRowsPerSecond" : 268.5174110840897,
"processedRowsPerSecond" : 267.4765196326098
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@4284be04"
}
}
{noformat}

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been 

[jira] [Commented] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread tommy duan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838332#comment-16838332
 ] 

tommy duan commented on SPARK-27648:


[~gsomogyi] Every trigger in my program updates the broadcast object(with 
myBroad.unpersists(true),then reset the value)

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27648) In Spark2.4 Structured Streaming:The executor storage memory increasing over time

2019-05-13 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838324#comment-16838324
 ] 

Gabor Somogyi commented on SPARK-27648:
---

I've also taken a deeper look at the provided data and can confirm the state 
handling looks healthy. The possible leak is coming from a different source.

> In Spark2.4 Structured Streaming:The executor storage memory increasing over 
> time
> -
>
> Key: SPARK-27648
> URL: https://issues.apache.org/jira/browse/SPARK-27648
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: tommy duan
>Priority: Major
> Attachments: houragg(1).out, houragg_filter.csv, 
> image-2019-05-09-17-51-14-036.png, image-2019-05-10-17-49-42-034.png
>
>
> *Spark Program Code Business:*
>  Read the topic on kafka, aggregate the stream data sources, and then output 
> it to another topic line of kafka.
> *Problem Description:*
>  *1) Using spark structured streaming in CDH environment (spark 2.2)*, memory 
> overflow problems often occur (because of too many versions of state stored 
> in memory, this bug has been modified in spark 2.4).
> {code:java}
> /spark-submit \
> --conf “spark.yarn.executor.memoryOverhead=4096M”
> --num-executors 15 \
> --executor-memory 3G \
> --executor-cores 2 \
> --driver-memory 6G \{code}
> {code}
> Executor memory exceptions occur when running with this submit resource under 
> SPARK 2.2 and the normal running time does not exceed one day.
> The solution is to set the executor memory larger than before 
> {code:java}
>  My spark-submit script is as follows:
> /spark-submit\
> conf "spark. yarn. executor. memoryOverhead = 4096M"
> num-executors 15\
> executor-memory 46G\
> executor-cores 3\
> driver-memory 6G\
> ...{code}
> In this case, the spark program can be guaranteed to run stably for a long 
> time, and the executor storage memory is less than 10M (it has been running 
> stably for more than 20 days).
> *2) From the upgrade information of Spark 2.4, we can see that the problem of 
> large memory consumption of state storage has been solved in Spark 2.4.* 
>  So we upgraded spark to SPARK 2.4 under CDH, tried to run the spark program, 
> and found that the use of memory was reduced.
>  But a problem arises, as the running time increases, the storage memory of 
> executor is growing (see Executors - > Storage Memory from the Spark on Yarn 
> Resource Manager UI).
>  This program has been running for 14 days (under SPARK 2.2, running with 
> this submit resource, the normal running time is not more than one day, 
> Executor memory abnormalities will occur).
>  The script submitted by the program under spark2.4 is as follows:
> {code:java}
> /spark-submit \
>  --conf “spark.yarn.executor.memoryOverhead=4096M”
>  --num-executors 15 \
>  --executor-memory 3G \
>  --executor-cores 2 \
>  --driver-memory 6G 
> {code}
> Under Spark 2.4, I counted the size of executor memory as time went by during 
> the running of the spark program:
> |Run-time(hour)|Storage Memory size(MB)|Memory growth rate(MB/hour)|
> |23.5H|41.6MB/1.5GB|1.770212766|
> |108.4H|460.2MB/1.5GB|4.245387454|
> |131.7H|559.1MB/1.5GB|4.245254366|
> |135.4H|575MB/1.5GB|4.246676514|
> |153.6H|641.2MB/1.5GB|4.174479167|
> |219H|888.1MB/1.5GB|4.055251142|
> |263H|1126.4MB/1.5GB|4.282889734|
> |309H|1228.8MB/1.5GB|3.976699029|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27546) Should repalce DateTimeUtils#defaultTimeZoneuse with sessionLocalTimeZone

2019-05-13 Thread Jiatao Tao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823808#comment-16823808
 ] 

Jiatao Tao edited comment on SPARK-27546 at 5/13/19 7:18 AM:
-

So in my opinion, we should change DateTimeUtils#defaultTimeZone to the 
timezone that "spark.sql.session.timeZone" configured.

And I would like to take this JIRA and give a fix if community think this is 
indeed a problem.


was (Author: aron.tao):
So in my opinion, we should change DateTimeUtils#defaultTimeZone to the 
timezone that "spark.sql.session.timeZone" configured.

And I would like to take this JIRA if community think this is indeed a problem.

> Should repalce DateTimeUtils#defaultTimeZoneuse with sessionLocalTimeZone
> -
>
> Key: SPARK-27546
> URL: https://issues.apache.org/jira/browse/SPARK-27546
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Jiatao Tao
>Priority: Minor
> Attachments: image-2019-04-23-08-10-00-475.png, 
> image-2019-04-23-08-10-50-247.png
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27668) File source V2: support reporting statistics

2019-05-13 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-27668.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24571
[https://github.com/apache/spark/pull/24571]

> File source V2: support reporting statistics
> 
>
> Key: SPARK-27668
> URL: https://issues.apache.org/jira/browse/SPARK-27668
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Major
> Fix For: 3.0.0
>
>
> In File source V1, the statistics of `HadoopFsRelation` is `compressionFactor 
> * sizeInBytesOfAllFiles`.
> To follow it, we can implement the interface SupportsReportStatistics in 
> FileScan and report the same statistics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27668) File source V2: support reporting statistics

2019-05-13 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-27668:
---

Assignee: Gengliang Wang

> File source V2: support reporting statistics
> 
>
> Key: SPARK-27668
> URL: https://issues.apache.org/jira/browse/SPARK-27668
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Major
>
> In File source V1, the statistics of `HadoopFsRelation` is `compressionFactor 
> * sizeInBytesOfAllFiles`.
> To follow it, we can implement the interface SupportsReportStatistics in 
> FileScan and report the same statistics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org