[jira] [Commented] (SPARK-32041) Exchange reuse won't work in cases when DPP, subqueries are involved

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141306#comment-17141306
 ] 

Apache Spark commented on SPARK-32041:
--

User 'prakharjain09' has created a pull request for this issue:
https://github.com/apache/spark/pull/28881

> Exchange reuse won't work in cases when DPP, subqueries are involved
> 
>
> Key: SPARK-32041
> URL: https://issues.apache.org/jira/browse/SPARK-32041
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Prakhar Jain
>Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and 
> if that exchange has some some DPP Subquery filter, then ReuseExchange 
> doesn't work for such Exchange and different stages are launched to compute 
> same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
>   .write
>   .partitionBy("store_id")
>   .format("parquet")
>   .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
>   (1, "AU", "US"),
>   (2, "CA", "US"),
>   (3, "KA", "IN"),
>   (4, "DL", "IN"),
>   (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
>   .write
>   .format("parquet")
>   .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
> With view1 as (
>   SELECT product_id, f.store_id
>   FROM fact_stats f JOIN dim_stats
>   ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
> SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
>  *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
>  :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0
>  : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
>  : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], 
> Inner, BuildRight
>  : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : : +- *(2) Filter isnotnull(product_id#1968)
>  : : +- *(2) ColumnarToRow
>  : : +- FileScan parquet 
> default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] 
> Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [isnotnull(store_id#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], 
> PushedFilters: [IsNotNull(product_id)], ReadSchema: struct
>  : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], 
> [id=#1131|#1131]
>  : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#1021|#1021]
>  : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> true] as bigint))), [id=#1021|#1021]
>  : +- *(1) Project [store_id#1971|#1971]
>  : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND 
> isnotnull(store_id#1971))
>  : +- *(1) ColumnarToRow
>  : +- FileScan parquet 
> default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: 
> true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)|#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(country), 
> EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: 
> struct
>  +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0
>  +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], 
> Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
>  Note the last line of plan. Its a ReusedExchange which is pointing to 
> id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange 
> node is pointing to incorrect Child node (1026 instead of 1140) and so in 
> actual, exchange reuse won't happen in this query.
> Another query where 

[jira] [Assigned] (SPARK-32041) Exchange reuse won't work in cases when DPP, subqueries are involved

2020-06-20 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32041:


Assignee: (was: Apache Spark)

> Exchange reuse won't work in cases when DPP, subqueries are involved
> 
>
> Key: SPARK-32041
> URL: https://issues.apache.org/jira/browse/SPARK-32041
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Prakhar Jain
>Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and 
> if that exchange has some some DPP Subquery filter, then ReuseExchange 
> doesn't work for such Exchange and different stages are launched to compute 
> same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
>   .write
>   .partitionBy("store_id")
>   .format("parquet")
>   .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
>   (1, "AU", "US"),
>   (2, "CA", "US"),
>   (3, "KA", "IN"),
>   (4, "DL", "IN"),
>   (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
>   .write
>   .format("parquet")
>   .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
> With view1 as (
>   SELECT product_id, f.store_id
>   FROM fact_stats f JOIN dim_stats
>   ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
> SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
>  *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
>  :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0
>  : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
>  : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], 
> Inner, BuildRight
>  : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : : +- *(2) Filter isnotnull(product_id#1968)
>  : : +- *(2) ColumnarToRow
>  : : +- FileScan parquet 
> default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] 
> Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [isnotnull(store_id#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], 
> PushedFilters: [IsNotNull(product_id)], ReadSchema: struct
>  : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], 
> [id=#1131|#1131]
>  : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#1021|#1021]
>  : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> true] as bigint))), [id=#1021|#1021]
>  : +- *(1) Project [store_id#1971|#1971]
>  : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND 
> isnotnull(store_id#1971))
>  : +- *(1) ColumnarToRow
>  : +- FileScan parquet 
> default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: 
> true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)|#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(country), 
> EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: 
> struct
>  +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0
>  +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], 
> Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
>  Note the last line of plan. Its a ReusedExchange which is pointing to 
> id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange 
> node is pointing to incorrect Child node (1026 instead of 1140) and so in 
> actual, exchange reuse won't happen in this query.
> Another query where issue is because of ReuseSubquery:
> {noformat}
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")

[jira] [Commented] (SPARK-32041) Exchange reuse won't work in cases when DPP, subqueries are involved

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141304#comment-17141304
 ] 

Apache Spark commented on SPARK-32041:
--

User 'prakharjain09' has created a pull request for this issue:
https://github.com/apache/spark/pull/28881

> Exchange reuse won't work in cases when DPP, subqueries are involved
> 
>
> Key: SPARK-32041
> URL: https://issues.apache.org/jira/browse/SPARK-32041
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Prakhar Jain
>Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and 
> if that exchange has some some DPP Subquery filter, then ReuseExchange 
> doesn't work for such Exchange and different stages are launched to compute 
> same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
>   .write
>   .partitionBy("store_id")
>   .format("parquet")
>   .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
>   (1, "AU", "US"),
>   (2, "CA", "US"),
>   (3, "KA", "IN"),
>   (4, "DL", "IN"),
>   (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
>   .write
>   .format("parquet")
>   .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
> With view1 as (
>   SELECT product_id, f.store_id
>   FROM fact_stats f JOIN dim_stats
>   ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
> SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
>  *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
>  :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0
>  : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
>  : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], 
> Inner, BuildRight
>  : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : : +- *(2) Filter isnotnull(product_id#1968)
>  : : +- *(2) ColumnarToRow
>  : : +- FileScan parquet 
> default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] 
> Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [isnotnull(store_id#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], 
> PushedFilters: [IsNotNull(product_id)], ReadSchema: struct
>  : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], 
> [id=#1131|#1131]
>  : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#1021|#1021]
>  : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> true] as bigint))), [id=#1021|#1021]
>  : +- *(1) Project [store_id#1971|#1971]
>  : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND 
> isnotnull(store_id#1971))
>  : +- *(1) ColumnarToRow
>  : +- FileScan parquet 
> default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: 
> true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)|#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(country), 
> EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: 
> struct
>  +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0
>  +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], 
> Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
>  Note the last line of plan. Its a ReusedExchange which is pointing to 
> id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange 
> node is pointing to incorrect Child node (1026 instead of 1140) and so in 
> actual, exchange reuse won't happen in this query.
> Another query where 

[jira] [Assigned] (SPARK-32041) Exchange reuse won't work in cases when DPP, subqueries are involved

2020-06-20 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32041:


Assignee: Apache Spark

> Exchange reuse won't work in cases when DPP, subqueries are involved
> 
>
> Key: SPARK-32041
> URL: https://issues.apache.org/jira/browse/SPARK-32041
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Prakhar Jain
>Assignee: Apache Spark
>Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and 
> if that exchange has some some DPP Subquery filter, then ReuseExchange 
> doesn't work for such Exchange and different stages are launched to compute 
> same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
>   .write
>   .partitionBy("store_id")
>   .format("parquet")
>   .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
>   (1, "AU", "US"),
>   (2, "CA", "US"),
>   (3, "KA", "IN"),
>   (4, "DL", "IN"),
>   (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
>   .write
>   .format("parquet")
>   .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
> With view1 as (
>   SELECT product_id, f.store_id
>   FROM fact_stats f JOIN dim_stats
>   ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
> SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
>  *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
>  :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0
>  : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
>  : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], 
> Inner, BuildRight
>  : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : : +- *(2) Filter isnotnull(product_id#1968)
>  : : +- *(2) ColumnarToRow
>  : : +- FileScan parquet 
> default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] 
> Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [isnotnull(store_id#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], 
> PushedFilters: [IsNotNull(product_id)], ReadSchema: struct
>  : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], 
> [id=#1131|#1131]
>  : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#1021|#1021]
>  : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> true] as bigint))), [id=#1021|#1021]
>  : +- *(1) Project [store_id#1971|#1971]
>  : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND 
> isnotnull(store_id#1971))
>  : +- *(1) ColumnarToRow
>  : +- FileScan parquet 
> default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: 
> true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)|#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(country), 
> EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: 
> struct
>  +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0
>  +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], 
> Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
>  Note the last line of plan. Its a ReusedExchange which is pointing to 
> id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange 
> node is pointing to incorrect Child node (1026 instead of 1140) and so in 
> actual, exchange reuse won't happen in this query.
> Another query where issue is because of ReuseSubquery:
> {noformat}
> spark.sql("set 

[jira] [Created] (SPARK-32041) Exchange reuse won't work in cases when DPP, subqueries are involved

2020-06-20 Thread Prakhar Jain (Jira)
Prakhar Jain created SPARK-32041:


 Summary: Exchange reuse won't work in cases when DPP, subqueries 
are involved
 Key: SPARK-32041
 URL: https://issues.apache.org/jira/browse/SPARK-32041
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0, 2.4.6
Reporter: Prakhar Jain


When an Exchange node is repeated at multiple places in the PhysicalPlan, and 
if that exchange has some some DPP Subquery filter, then ReuseExchange doesn't 
work for such Exchange and different stages are launched to compute same thing.

Example:
{noformat}
// generate data
val factData = (1 to 100).map(i => (i%5, i%20, i))
factData.toDF("store_id", "product_id", "units_sold")
  .write
  .partitionBy("store_id")
  .format("parquet")
  .saveAsTable("fact_stats")

val dimData = Seq[(Int, String, String)](
  (1, "AU", "US"),
  (2, "CA", "US"),
  (3, "KA", "IN"),
  (4, "DL", "IN"),
  (5, "GA", "PA"))

dimData.toDF("store_id", "state_province", "country")
  .write
  .format("parquet")
  .saveAsTable("dim_stats")

sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")

// Set Configs
spark.sql("set 
spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")

val query = """
With view1 as (
  SELECT product_id, f.store_id
  FROM fact_stats f JOIN dim_stats
  ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
"""
val df = spark.sql(query)
println(df.queryExecution.executedPlan)

{noformat}
{noformat}
Plan:
 *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
 :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, 0
 : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
 : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
 : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], 
Inner, BuildRight
 : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
 : : +- *(2) Filter isnotnull(product_id#1968)
 : : +- *(2) ColumnarToRow
 : : +- FileScan parquet 
default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] Batched: 
true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: Parquet, 
Location: 
InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
 PartitionFilters: [isnotnull(store_id#1970), 
dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), 
dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], PushedFilters: 
[IsNotNull(product_id)], ReadSchema: struct
 : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], 
[id=#1131|#1131]
 : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
[id=#1021|#1021]
 : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
true] as bigint))), [id=#1021|#1021]
 : +- *(1) Project [store_id#1971|#1971]
 : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND 
isnotnull(store_id#1971))
 : +- *(1) ColumnarToRow
 : +- FileScan parquet 
default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: true, 
DataFilters: [isnotnull(country#1973), (country#1973 = IN), 
isnotnull(store_id#1971)|#1973), (country#1973 = IN), 
isnotnull(store_id#1971)], Format: Parquet, Location: 
InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,IN), 
IsNotNull(store_id)], ReadSchema: struct
 +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, 0
 +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], 
Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
{noformat}
Issue:
 Note the last line of plan. Its a ReusedExchange which is pointing to id=1026. 
But There is no Exchange node in plan with ID 1026. ReusedExchange node is 
pointing to incorrect Child node (1026 instead of 1140) and so in actual, 
exchange reuse won't happen in this query.

Another query where issue is because of ReuseSubquery:
{noformat}
spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")

val query1 = """
  | With view1 as (
  |   SELECT product_id, units_sold
  |   FROM fact_stats
  |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
  | and units_sold = 2
  | ), view2 as (
  |   SELECT product_id, units_sold
  |   FROM fact_stats
  |   WHERE store_id = (SELECT 

[jira] [Commented] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141291#comment-17141291
 ] 

Apache Spark commented on SPARK-32039:
--

User 'rajatahujaatinmobi' has created a pull request for this issue:
https://github.com/apache/spark/pull/28880

> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141290#comment-17141290
 ] 

Apache Spark commented on SPARK-32039:
--

User 'rajatahujaatinmobi' has created a pull request for this issue:
https://github.com/apache/spark/pull/28880

> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32016) Why spark does not preserve the original timestamp format while writing dataset to file or hdfs

2020-06-20 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141265#comment-17141265
 ] 

Hyukjin Kwon commented on SPARK-32016:
--

I am not very clear what original format you mean. Once it's parsed from CSV to 
JVM, there's no original format but it's timestamp instances on JVM.
If you want to keep the original format, you might have to deal with them as 
strings are are.
The workarounds you mentioned look fair enough to me.

> Why spark does not preserve the original timestamp format while writing 
> dataset to file or hdfs
> ---
>
> Key: SPARK-32016
> URL: https://issues.apache.org/jira/browse/SPARK-32016
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.3.0, 2.4.0, 2.4.3
> Environment: Apache spark 2.3 and spark 2.4. May happen in other as 
> well
>Reporter: Anupam Jain
>Priority: Major
>
> Want to write spark dataset having few timestamp columns into hdfs.
>  * While reading, by default spark infers data as timestamp, if format is 
> similar to "*-MM-dd HH:mm:ss*".
>  * But while writing to file, saves in format as 
> "*-MM-dd'T'HH:mm:ss.SSSXXX*"
>  * For e.g. source data *2020-06-01 12:10:03* is written as 
> *2020-06-01T12:10:03.000+05:30*.
>  * Expected is to preserve the oroginal timestamp format before writing.
> Why spark does not preserve the original timestamp format while writing 
> dataset to file or hdfs?
> Using simple java code like:
> {color:#4c9aff}Dataset ds = 
> spark.read().format("csv").option("path",the_path).option("inferSchema","true").load();
>  {color}
> {color:#4c9aff}ds.write().format("csv").save("path_to_save");{color}
> I know the workaround:
>  * Use "*timestampFormat*" option before save.
>  * But may have performance overhead and also its global for all columns.
>  * So lets say have 2 columns having formats "*-MM-dd HH:mm:ss*" and 
> "*-MM-dd HH*". Both can be inferred as timestamp by default, but outputs 
> in a single specified "timestampFormat".
>  * Another way is to use date_format(col, format). But that also may have 
> performance overhead and includes operations to apply, whereas I expect spark 
> to preserve the original format
> Tried with spark2.3 and spark2.4



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-32022) Can many executors share one gpu for spark3.0?

2020-06-20 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-32022.
--
Resolution: Invalid

> Can many executors share one gpu for spark3.0?
> --
>
> Key: SPARK-32022
> URL: https://issues.apache.org/jira/browse/SPARK-32022
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 3.0.0
> Environment: spark3.0 + Hadoop3 + yarn cluster 
>Reporter: liucheng
>Priority: Major
>
> hi, I want to run mang executors(for example, 2) in a server with only one 
> GPU card,I tested spark3.0 in yarn cluster mode with the following config:
> spark-shell  --conf spark.executor.resource.gpu.amount=0.5 
>  
> Then ,I find the following errors:
>  
> 20/06/18 16:24:46 [main] {color:#FF}ERROR{color} SparkContext: Error 
> initializing SparkContext.
> java.lang.NumberFormatException:{color:#FF} For input string: "0.5"{color}
>  at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>  at java.lang.Integer.parseInt(Integer.java:580)
>  at java.lang.Integer.parseInt(Integer.java:615)
>  at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
>  at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
>  at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
>  at 
> org.apache.spark.resource.ResourceUtils$.parseResourceRequest(ResourceUtils.scala:142)
>  at 
> org.apache.spark.resource.ResourceUtils$.$anonfun$parseAllResourceRequests$1(ResourceUtils.scala:159)
>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>  at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>  at 
> org.apache.spark.resource.ResourceUtils$.parseAllResourceRequests(ResourceUtils.scala:159)
>  at 
> org.apache.spark.SparkContext$.checkResourcesPerTask$1(SparkContext.scala:2773)
>  at 
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2921)
>  at org.apache.spark.SparkContext.(SparkContext.scala:528)
>  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2555)
>  at 
> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$1(SparkSession.scala:931)
>  at scala.Option.getOrElse(Option.scala:189)
>  
> 
>  
> My question:for spark3.0 Can one gpu card support many executors ?
> Thank you!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32022) Can many executors share one gpu for spark3.0?

2020-06-20 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141261#comment-17141261
 ] 

Hyukjin Kwon commented on SPARK-32022:
--

Let's loop the mailing list when it's a question, instead of filing as an issue 
here.

> Can many executors share one gpu for spark3.0?
> --
>
> Key: SPARK-32022
> URL: https://issues.apache.org/jira/browse/SPARK-32022
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 3.0.0
> Environment: spark3.0 + Hadoop3 + yarn cluster 
>Reporter: liucheng
>Priority: Major
>
> hi, I want to run mang executors(for example, 2) in a server with only one 
> GPU card,I tested spark3.0 in yarn cluster mode with the following config:
> spark-shell  --conf spark.executor.resource.gpu.amount=0.5 
>  
> Then ,I find the following errors:
>  
> 20/06/18 16:24:46 [main] {color:#FF}ERROR{color} SparkContext: Error 
> initializing SparkContext.
> java.lang.NumberFormatException:{color:#FF} For input string: "0.5"{color}
>  at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>  at java.lang.Integer.parseInt(Integer.java:580)
>  at java.lang.Integer.parseInt(Integer.java:615)
>  at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
>  at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
>  at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
>  at 
> org.apache.spark.resource.ResourceUtils$.parseResourceRequest(ResourceUtils.scala:142)
>  at 
> org.apache.spark.resource.ResourceUtils$.$anonfun$parseAllResourceRequests$1(ResourceUtils.scala:159)
>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>  at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>  at 
> org.apache.spark.resource.ResourceUtils$.parseAllResourceRequests(ResourceUtils.scala:159)
>  at 
> org.apache.spark.SparkContext$.checkResourcesPerTask$1(SparkContext.scala:2773)
>  at 
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2921)
>  at org.apache.spark.SparkContext.(SparkContext.scala:528)
>  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2555)
>  at 
> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$1(SparkSession.scala:931)
>  at scala.Option.getOrElse(Option.scala:189)
>  
> 
>  
> My question:for spark3.0 Can one gpu card support many executors ?
> Thank you!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32040) Idle cores not being allocated

2020-06-20 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-32040:
-
Description: 
*Background:*

I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
running on EC2. We deploy a Scala web server as a long running jar via 
{{spark-submit}} in client mode. Sometimes we get into a state where the 
application ends up with 0 cores due to our in-house autoscaler scaling down 
and killing workers without checking if any of the cores in the worker were 
allocated to existing applications. These applications then end up with 0 
cores, even though there are healthy workers in the cluster. 

However, if i submit a new application or register a new worker in the 
cluster, only then will the master finally reallocate cores to the 
application. This is problematic, because the long running 0 core 
application is stuck. 

Could this be related to the fact that {{schedule()}} is only triggered by new 
workers / new applications as commented here? 
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L721-L724

If that is the case, should the application be calling {{schedule()}} when 
removing workers after calling {{timeOutWorkers()}}? 
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L417

The downscaling causes me to see this in my logs, so i am fairly certain 
{{timeOutWorkers()}} is being called: 

{code}
20/06/08 11:40:56 INFO Master: Application app-20200608114056-0006 requested 
to set total executors to 1. 
20/06/08 11:40:56 INFO Master: Launching executor app-20200608114056-0006/0 
on worker worker-20200608113523--7077 
20/06/08 11:41:44 WARN Master: Removing 
worker-20200608113523--7077 because we got no heartbeat in 60 
seconds 
20/06/08 11:41:44 INFO Master: Removing worker 
worker-20200608113523--7077 on :7077 
20/06/08 11:41:44 INFO Master: Telling app of lost executor: 0 
20/06/08 11:41:44 INFO Master: Telling app of lost worker: 
worker-20200608113523-10.158.242.213-7077 
{code}


  was:
*Background:*

I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
running on EC2. We deploy a Scala web server as a long running jar via 
{{spark-submit}} in client mode. Sometimes we get into a state where the 
application ends up with 0 cores due to our in-house autoscaler scaling down 
and killing workers without checking if any of the cores in the worker were 
allocated to existing applications. These applications then end up with 0 
cores, even though there are healthy workers in the cluster. 

However, if i submit a new application or register a new worker in the 
cluster, only then will the master finally reallocate cores to the 
application. This is problematic, because the long running 0 core 
application is stuck. 

Could this be related to the fact that {{schedule()}} is only triggered by new 
workers / new applications as commented here? 
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L721-L724

If that is the case, should the application be calling {{schedule()}} when 
removing workers after calling {{timeOutWorkers()}}? 
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L417

The downscaling causes me to see this in my logs, so i am fairly certain 
{[timeOutWorkers()}} is being called: 

{code}
20/06/08 11:40:56 INFO Master: Application app-20200608114056-0006 requested 
to set total executors to 1. 
20/06/08 11:40:56 INFO Master: Launching executor app-20200608114056-0006/0 
on worker worker-20200608113523--7077 
20/06/08 11:41:44 WARN Master: Removing 
worker-20200608113523--7077 because we got no heartbeat in 60 
seconds 
20/06/08 11:41:44 INFO Master: Removing worker 
worker-20200608113523--7077 on :7077 
20/06/08 11:41:44 INFO Master: Telling app of lost executor: 0 
20/06/08 11:41:44 INFO Master: Telling app of lost worker: 
worker-20200608113523-10.158.242.213-7077 
{code}



> Idle cores not being allocated
> --
>
> Key: SPARK-32040
> URL: https://issues.apache.org/jira/browse/SPARK-32040
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.4.5
>Reporter: t oo
>Priority: Major
>
> *Background:*
> I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
> running on EC2. We deploy a Scala web server as a long running jar via 
> {{spark-submit}} in client mode. Sometimes we get into a state where the 
> application ends up with 0 cores due to our in-house autoscaler scaling down 
> and killing workers without checking if any of the cores in the worker were 
> allocated to existing applications. These applications then end up with 0 
> 

[jira] [Updated] (SPARK-32040) Idle cores not being allocated

2020-06-20 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-32040:
-
Description: 
*Background:*

I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
running on EC2. We deploy a Scala web server as a long running jar via 
{{spark-submit}} in client mode. Sometimes we get into a state where the 
application ends up with 0 cores due to our in-house autoscaler scaling down 
and killing workers without checking if any of the cores in the worker were 
allocated to existing applications. These applications then end up with 0 
cores, even though there are healthy workers in the cluster. 

However, if i submit a new application or register a new worker in the 
cluster, only then will the master finally reallocate cores to the 
application. This is problematic, because the long running 0 core 
application is stuck. 

Could this be related to the fact that {{schedule()}} is only triggered by new 
workers / new applications as commented here? 
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L721-L724

If that is the case, should the application be calling {{schedule()}} when 
removing workers after calling {{timeOutWorkers()}}? 
https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L417

The downscaling causes me to see this in my logs, so i am fairly certain 
{[timeOutWorkers()}} is being called: 

{code}
20/06/08 11:40:56 INFO Master: Application app-20200608114056-0006 requested 
to set total executors to 1. 
20/06/08 11:40:56 INFO Master: Launching executor app-20200608114056-0006/0 
on worker worker-20200608113523--7077 
20/06/08 11:41:44 WARN Master: Removing 
worker-20200608113523--7077 because we got no heartbeat in 60 
seconds 
20/06/08 11:41:44 INFO Master: Removing worker 
worker-20200608113523--7077 on :7077 
20/06/08 11:41:44 INFO Master: Telling app of lost executor: 0 
20/06/08 11:41:44 INFO Master: Telling app of lost worker: 
worker-20200608113523-10.158.242.213-7077 
{code}


  was:
Background: 
I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
running on EC2. We deploy a Scala web server as a long running jar via 
`spark-submit` in client mode. Sometimes we get into a state where the 
application ends up with 0 cores due to our in-house autoscaler scaling down 
and killing workers without checking if any of the cores in the worker were 
allocated to existing applications. These applications then end up with 0 
cores, even though there are healthy workers in the cluster. 

However, if i submit a new application or register a new worker in the 
cluster, only then will the master finally reallocate cores to the 
application. This is problematic, because the long running 0 core 
application is stuck. 

Could this be related to the fact that `schedule()` is only triggered by new 
workers / new applications as commented here? 
[https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L721-L724]

If that is the case, should the application be calling `schedule()` when 
removing workers after calling `timeOutWorkers()`? 
[https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L417]

The downscaling causes me to see this in my logs, so i am fairly certain 
`timeOutWorkers()` is being called: 
``` 
20/06/08 11:40:56 INFO Master: Application app-20200608114056-0006 requested 
to set total executors to 1. 
20/06/08 11:40:56 INFO Master: Launching executor app-20200608114056-0006/0 
on worker worker-20200608113523--7077 
20/06/08 11:41:44 WARN Master: Removing 
worker-20200608113523--7077 because we got no heartbeat in 60 
seconds 
20/06/08 11:41:44 INFO Master: Removing worker 
worker-20200608113523--7077 on :7077 
20/06/08 11:41:44 INFO Master: Telling app of lost executor: 0 
20/06/08 11:41:44 INFO Master: Telling app of lost worker: 
worker-20200608113523-10.158.242.213-7077 
```


> Idle cores not being allocated
> --
>
> Key: SPARK-32040
> URL: https://issues.apache.org/jira/browse/SPARK-32040
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.4.5
>Reporter: t oo
>Priority: Major
>
> *Background:*
> I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
> running on EC2. We deploy a Scala web server as a long running jar via 
> {{spark-submit}} in client mode. Sometimes we get into a state where the 
> application ends up with 0 cores due to our in-house autoscaler scaling down 
> and killing workers without checking if any of the cores in the worker were 
> allocated to existing applications. These applications then end up with 0 
> cores, even 

[jira] [Commented] (SPARK-32010) Thread leaks in pinned thread mode

2020-06-20 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141259#comment-17141259
 ] 

Hyukjin Kwon commented on SPARK-32010:
--

cc [~irashid] FYI

> Thread leaks in pinned thread mode
> --
>
> Key: SPARK-32010
> URL: https://issues.apache.org/jira/browse/SPARK-32010
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.1.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
>
> SPARK-22340 introduced a pin thread mode which guarantees you to sync Python 
> thread and JVM thread.
> However, looks like the JVM threads are not finished even when the Python 
> thread is finished. It can be debugged via YourKit, and run multiple jobs 
> with multiple threads at the same time.
> Easiest reproducer is:
> {code}
> PYSPARK_PIN_THREAD=true ./bin/pyspark
> {code}
> {code}
> >>> from threading import Thread
> >>> Thread(target=lambda: spark.range(1000).collect()).start()
> >>> Thread(target=lambda: spark.range(1000).collect()).start()
> >>> Thread(target=lambda: spark.range(1000).collect()).start()
> >>> spark._jvm._gateway_client.deque
> deque([, 
> , 
> , 
> , 
> ])
> >>> Thread(target=lambda: spark.range(1000).collect()).start()
> >>> spark._jvm._gateway_client.deque
> deque([, 
> , 
> , 
> , 
> , 
> ])
> {code}
> The connection doesn't get closed, and it holds JVM thread running.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27702) Allow using some alternatives for service accounts

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-27702:
-

Assignee: Udbhav Agrawal

> Allow using some alternatives for service accounts
> --
>
> Key: SPARK-27702
> URL: https://issues.apache.org/jira/browse/SPARK-27702
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes, Spark Core
>Affects Versions: 3.1.0
>Reporter: Udbhav Agrawal
>Assignee: Udbhav Agrawal
>Priority: Minor
>
> Some some cloud environment use alternatives for kubernetes Service-account. 
> It will be flexible for user if we allow  user to have alternative to it.
> After SPARK-25887 user can use kubeconfig file to get authenticated and hence 
> can also create a secret and mount that to pod using pod template.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27702) Allow using some alternatives for service accounts

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-27702.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

Issue resolved by pull request 24601
[https://github.com/apache/spark/pull/24601]

> Allow using some alternatives for service accounts
> --
>
> Key: SPARK-27702
> URL: https://issues.apache.org/jira/browse/SPARK-27702
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes, Spark Core
>Affects Versions: 3.1.0
>Reporter: Udbhav Agrawal
>Assignee: Udbhav Agrawal
>Priority: Minor
> Fix For: 3.1.0
>
>
> Some some cloud environment use alternatives for kubernetes Service-account. 
> It will be flexible for user if we allow  user to have alternative to it.
> After SPARK-25887 user can use kubeconfig file to get authenticated and hence 
> can also create a secret and mount that to pod using pod template.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24266) Spark client terminates while driver is still running

2020-06-20 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-24266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-24266:
-
Target Version/s: 3.1.0, 2.4.7

> Spark client terminates while driver is still running
> -
>
> Key: SPARK-24266
> URL: https://issues.apache.org/jira/browse/SPARK-24266
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes, Spark Core
>Affects Versions: 2.3.0, 3.0.0
>Reporter: Chun Chen
>Priority: Major
>
> {code}
> Warning: Ignoring non-spark config property: Default=system properties 
> included when running spark-submit.
> 18/05/11 14:50:12 WARN Config: Error reading service account token from: 
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 18/05/11 14:50:12 INFO HadoopStepsOrchestrator: Hadoop Conf directory: 
> Some(/data/tesla/spark-2.2.0-k8s-0.5.0-bin-2.7.3/hadoop-conf)
> 18/05/11 14:50:15 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 18/05/11 14:50:15 WARN DomainSocketFactory: The short-circuit local reads 
> feature cannot be used because libhadoop cannot be loaded.
> 18/05/11 14:50:16 INFO HadoopConfBootstrapImpl: HADOOP_CONF_DIR defined. 
> Mounting Hadoop specific files
> 18/05/11 14:50:17 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: N/A
>start time: N/A
>container images: N/A
>phase: Pending
>status: []
> 18/05/11 14:50:17 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: tbds-100-98-45-69
>start time: N/A
>container images: N/A
>phase: Pending
>status: []
> 18/05/11 14:50:18 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: tbds-100-98-45-69
>start time: 2018-05-11T06:50:17Z
>container images: docker.oa.com:8080/gaia/spark-driver-cos:20180503_9
>phase: Pending
>status: [ContainerStatus(containerID=null, 
> image=docker.oa.com:8080/gaia/spark-driver-cos:20180503_9, imageID=, 
> lastState=ContainerState(running=null, terminated=null, waiting=null, 
> additionalProperties={}), name=spark-kubernetes-driver, ready=false, 
> restartCount=0, state=ContainerState(running=null, terminated=null, 
> waiting=ContainerStateWaiting(message=null, reason=PodInitializing, 
> additionalProperties={}), additionalProperties={}), additionalProperties={})]
> 18/05/11 14:50:19 INFO Client: Waiting for application spark-64-293-980 to 
> finish...
> 18/05/11 14:50:25 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: tbds-100-98-45-69
>start time: 

[jira] [Commented] (SPARK-31887) Date casting to string is giving wrong value

2020-06-20 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141246#comment-17141246
 ] 

Hyukjin Kwon commented on SPARK-31887:
--

Very likely there won't be 2.5 and 2.6. So we should be good here.

> Date casting to string is giving wrong value
> 
>
> Key: SPARK-31887
> URL: https://issues.apache.org/jira/browse/SPARK-31887
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.5
> Environment: The spark is running on cluster mode with Mesos.
>  
> Mesos agents are dockerised running on Ubuntu 18.
>  
> Timezone setting of docker instance: UTC
> Timezone of server hosting docker: America/New_York
> Timezone of driver machine: America/New_York
>Reporter: Amit Gupta
>Priority: Major
>
> The code converts the string to date and then write it in csv.
> {code:java}
> val x = Seq(("2020-02-19", "2020-02-19 05:11:00")).toDF("a", 
> "b").select('a.cast("date"), 'b.cast("timestamp"))
> x.show()
> +--+---+
> | a|  b|
> +--+---+
> |2020-02-19|2020-02-19 05:11:00|
> +--+---+
> x.write.mode("overwrite").option("header", true).csv("/tmp/test1.csv")
> {code}
>  
> The date written in CSV file is different:
> {code:java}
> > snakebite cat "/tmp/test1.csv/*.csv"
> a,b
> 2020-02-18,2020-02-19T05:11:00.000Z{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24266) Spark client terminates while driver is still running

2020-06-20 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-24266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-24266:
-
Target Version/s:   (was: 3.1.0, 2.4.7)

> Spark client terminates while driver is still running
> -
>
> Key: SPARK-24266
> URL: https://issues.apache.org/jira/browse/SPARK-24266
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes, Spark Core
>Affects Versions: 2.3.0, 3.0.0
>Reporter: Chun Chen
>Priority: Major
>
> {code}
> Warning: Ignoring non-spark config property: Default=system properties 
> included when running spark-submit.
> 18/05/11 14:50:12 WARN Config: Error reading service account token from: 
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 18/05/11 14:50:12 INFO HadoopStepsOrchestrator: Hadoop Conf directory: 
> Some(/data/tesla/spark-2.2.0-k8s-0.5.0-bin-2.7.3/hadoop-conf)
> 18/05/11 14:50:15 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 18/05/11 14:50:15 WARN DomainSocketFactory: The short-circuit local reads 
> feature cannot be used because libhadoop cannot be loaded.
> 18/05/11 14:50:16 INFO HadoopConfBootstrapImpl: HADOOP_CONF_DIR defined. 
> Mounting Hadoop specific files
> 18/05/11 14:50:17 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: N/A
>start time: N/A
>container images: N/A
>phase: Pending
>status: []
> 18/05/11 14:50:17 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: tbds-100-98-45-69
>start time: N/A
>container images: N/A
>phase: Pending
>status: []
> 18/05/11 14:50:18 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: tbds-100-98-45-69
>start time: 2018-05-11T06:50:17Z
>container images: docker.oa.com:8080/gaia/spark-driver-cos:20180503_9
>phase: Pending
>status: [ContainerStatus(containerID=null, 
> image=docker.oa.com:8080/gaia/spark-driver-cos:20180503_9, imageID=, 
> lastState=ContainerState(running=null, terminated=null, waiting=null, 
> additionalProperties={}), name=spark-kubernetes-driver, ready=false, 
> restartCount=0, state=ContainerState(running=null, terminated=null, 
> waiting=ContainerStateWaiting(message=null, reason=PodInitializing, 
> additionalProperties={}), additionalProperties={}), additionalProperties={})]
> 18/05/11 14:50:19 INFO Client: Waiting for application spark-64-293-980 to 
> finish...
> 18/05/11 14:50:25 INFO LoggingPodStatusWatcherImpl: State changed, new state: 
>pod name: spark-64-293-980-1526021412180-driver
>namespace: tione-603074457
>labels: network -> FLOATINGIP, spark-app-selector -> 
> spark-2843da19c690485b93780ad7992a101e, spark-role -> driver
>pod uid: 90558303-54e7-11e8-9e64-525400da65d8
>creation time: 2018-05-11T06:50:17Z
>service account name: default
>volumes: spark-local-dir-0-spark-local, spark-init-properties, 
> download-jars-volume, download-files, spark-init-secret, hadoop-properties, 
> default-token-xvjt9
>node name: tbds-100-98-45-69
>start 

[jira] [Assigned] (SPARK-32019) Add spark.sql.files.minPartitionNum config

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-32019:
-

Assignee: ulysses you

> Add spark.sql.files.minPartitionNum config
> --
>
> Key: SPARK-32019
> URL: https://issues.apache.org/jira/browse/SPARK-32019
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: ulysses you
>Assignee: ulysses you
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-32019) Add spark.sql.files.minPartitionNum config

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-32019.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

Issue resolved by pull request 28853
[https://github.com/apache/spark/pull/28853]

> Add spark.sql.files.minPartitionNum config
> --
>
> Key: SPARK-32019
> URL: https://issues.apache.org/jira/browse/SPARK-32019
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: ulysses you
>Assignee: ulysses you
>Priority: Minor
> Fix For: 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141218#comment-17141218
 ] 

Dongjoon Hyun commented on SPARK-32038:
---

Since this is a correctness regression, I raised the priority to `Blocker` with 
target version `3.0.1`.

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Blocker
>  Labels: correctness
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32038:
--
Priority: Blocker  (was: Major)

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Blocker
>  Labels: correctness
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32038:
--
Target Version/s: 3.0.1

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Major
>  Labels: correctness
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32038:
--
Labels: correctness  (was: )

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Major
>  Labels: correctness
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32038:
--
Component/s: (was: Optimizer)

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Major
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32025) CSV schema inference with boolean & integer

2020-06-20 Thread Pablo Langa Blanco (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141213#comment-17141213
 ] 

Pablo Langa Blanco commented on SPARK-32025:


I'm looking for the problem, as a workaround you can define the schema to avoid 
the bug on infer schema automatically

> CSV schema inference with boolean & integer 
> 
>
> Key: SPARK-32025
> URL: https://issues.apache.org/jira/browse/SPARK-32025
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6
>Reporter: Brian Wallace
>Priority: Major
>
> I have a dataset consisting of two small files in CSV format. 
> {code:bash}
> $ cat /example/f0.csv
> col1
> 8589934592
> $ cat /example/f1.csv
> col1
> 4320
> true
> {code}
>  
> When I try and load this in (py)spark and infer schema, my expectation is 
> that the column is inferred to be a string. However, it is inferred as a 
> boolean:
> {code:python}
> spark.read.csv(path="file:///example/*.csv", header=True, inferSchema=True, 
> multiLine=True).show()
> ++
> |col1|
> ++
> |null|
> |true|
> |null|
> ++
> {code}
> Note that this seems to work correctly if multiLine is set to False (although 
> we need to set it to True as this column may indeed span multiple lines in 
> general).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32040) Idle cores not being allocated

2020-06-20 Thread t oo (Jira)
t oo created SPARK-32040:


 Summary: Idle cores not being allocated
 Key: SPARK-32040
 URL: https://issues.apache.org/jira/browse/SPARK-32040
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 2.4.5
Reporter: t oo


Background: 
I have a cluster (2.4.5) using standalone mode orchestrated by Nomad jobs 
running on EC2. We deploy a Scala web server as a long running jar via 
`spark-submit` in client mode. Sometimes we get into a state where the 
application ends up with 0 cores due to our in-house autoscaler scaling down 
and killing workers without checking if any of the cores in the worker were 
allocated to existing applications. These applications then end up with 0 
cores, even though there are healthy workers in the cluster. 

However, if i submit a new application or register a new worker in the 
cluster, only then will the master finally reallocate cores to the 
application. This is problematic, because the long running 0 core 
application is stuck. 

Could this be related to the fact that `schedule()` is only triggered by new 
workers / new applications as commented here? 
[https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L721-L724]

If that is the case, should the application be calling `schedule()` when 
removing workers after calling `timeOutWorkers()`? 
[https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L417]

The downscaling causes me to see this in my logs, so i am fairly certain 
`timeOutWorkers()` is being called: 
``` 
20/06/08 11:40:56 INFO Master: Application app-20200608114056-0006 requested 
to set total executors to 1. 
20/06/08 11:40:56 INFO Master: Launching executor app-20200608114056-0006/0 
on worker worker-20200608113523--7077 
20/06/08 11:41:44 WARN Master: Removing 
worker-20200608113523--7077 because we got no heartbeat in 60 
seconds 
20/06/08 11:41:44 INFO Master: Removing worker 
worker-20200608113523--7077 on :7077 
20/06/08 11:41:44 INFO Master: Telling app of lost executor: 0 
20/06/08 11:41:44 INFO Master: Telling app of lost worker: 
worker-20200608113523-10.158.242.213-7077 
```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32021) make_interval does not accept seconds >100

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32021:
--
Fix Version/s: (was: 3.0.0)
   3.0.1

> make_interval does not accept seconds >100
> --
>
> Key: SPARK-32021
> URL: https://issues.apache.org/jira/browse/SPARK-32021
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Juliusz Sompolski
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.0.1, 3.1.0
>
>
> In make_interval(years, months, weeks, days, hours, mins, secs), secs are 
> defined as Decimal(8, 6), which turns into null if the value of the 
> expression overflows 100 seconds.
> Larger seconds values should be allowed.
> This has been reported by Simba, who wants to use make_interval to implement 
> translation for TIMESTAMP_ADD ODBC function in Spark 3.0.
> ODBC {fn TIMESTAMPADD(SECOND, integer_exp, timestamp} fails when integer_exp 
> returns seconds values >= 100.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32021) make_interval does not accept seconds >100

2020-06-20 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32021:
--
Fix Version/s: 3.0.0

> make_interval does not accept seconds >100
> --
>
> Key: SPARK-32021
> URL: https://issues.apache.org/jira/browse/SPARK-32021
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Juliusz Sompolski
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.0.0, 3.1.0
>
>
> In make_interval(years, months, weeks, days, hours, mins, secs), secs are 
> defined as Decimal(8, 6), which turns into null if the value of the 
> expression overflows 100 seconds.
> Larger seconds values should be allowed.
> This has been reported by Simba, who wants to use make_interval to implement 
> translation for TIMESTAMP_ADD ODBC function in Spark 3.0.
> ODBC {fn TIMESTAMPADD(SECOND, integer_exp, timestamp} fails when integer_exp 
> returns seconds values >= 100.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32037) Rename blacklisting feature to avoid language with racist connotation

2020-06-20 Thread Meniluca (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141091#comment-17141091
 ] 

Meniluca commented on SPARK-32037:
--

I second the idea and I prefer healthy over other words mentioned so far. 
However, if I may, I'd like to add a few more ideas. Considering the moment, 
one could propose "*quarantined*" of course. Another one would be comparing 
nodes to players, as when they don't perform you leave them on the bench, hence 
"*benched*". The last one, as mentioned at the [2017 Spark Summit (minute 
20:56)|https://youtu.be/OPPg4JeWkzs?t=1255] executors are (using a Latin 
expression in diplomacy) "[persona non 
grata|https://en.wikipedia.org/wiki/Persona_non_grata]; which means "*exiled*".

I prefer "benched" as it has no really bad acceptation compared to other 
adjectives.

> Rename blacklisting feature to avoid language with racist connotation
> -
>
> Key: SPARK-32037
> URL: https://issues.apache.org/jira/browse/SPARK-32037
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.1
>Reporter: Erik Krogen
>Priority: Minor
>
> As per [discussion on the Spark dev 
> list|https://lists.apache.org/thread.html/rf6b2cdcba4d3875350517a2339619e5d54e12e66626a88553f9fe275%40%3Cdev.spark.apache.org%3E],
>  it will be beneficial to remove references to problematic language that can 
> alienate potential community members. One such reference is "blacklist". 
> While it seems to me that there is some valid debate as to whether this term 
> has racist origins, the cultural connotations are inescapable in today's 
> world.
> I've created a separate task, SPARK-32036, to remove references outside of 
> this feature. Given the large surface area of this feature and the 
> public-facing UI / configs / etc., more care will need to be taken here.
> I'd like to start by opening up debate on what the best replacement name 
> would be. Reject-/deny-/ignore-/block-list are common replacements for 
> "blacklist", but I'm not sure that any of them work well for this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-31893) Add a generic ClassificationSummary trait

2020-06-20 Thread Sean R. Owen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen resolved SPARK-31893.
--
Fix Version/s: 3.1.0
 Assignee: Huaxin Gao
   Resolution: Fixed

Resolved by https://github.com/apache/spark/pull/28710

> Add a generic ClassificationSummary trait
> -
>
> Key: SPARK-31893
> URL: https://issues.apache.org/jira/browse/SPARK-31893
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 3.1.0
>Reporter: Huaxin Gao
>Assignee: Huaxin Gao
>Priority: Minor
> Fix For: 3.1.0
>
>
> Add a generic ClassificationSummary trait, so all the classification models 
> can use it to implement model summary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141065#comment-17141065
 ] 

Apache Spark commented on SPARK-32039:
--

User 'rajatahujaatinmobi' has created a pull request for this issue:
https://github.com/apache/spark/pull/28879

> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32039:


Assignee: Apache Spark

> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Assignee: Apache Spark
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32039:


Assignee: (was: Apache Spark)

> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141064#comment-17141064
 ] 

Apache Spark commented on SPARK-32039:
--

User 'rajatahujaatinmobi' has created a pull request for this issue:
https://github.com/apache/spark/pull/28879

> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread rajat (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

rajat updated SPARK-32039:
--
Description: 
Spark Web UI port in Yarn cluster mode always gets a random number since we 
disable the configuration spark.ui.port by setting this property to be 0 
always. 

There are some use cases where we need to run Web UI port on some specified 
range because of some security concerns and unable to do so with the 
configuration. 

 

[https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]

  was:
Spark Web UI port in Yarn cluster mode always gets a random number since we 
disable the configuration spark.ui.port by setting this property to be 0 
always. 

There are some use cases where we need to run Web UI port on some specified 
range because of some security concerns and unable to do so with the 
configuration. 


> Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode
> 
>
> Key: SPARK-32039
> URL: https://issues.apache.org/jira/browse/SPARK-32039
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: rajat
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Spark Web UI port in Yarn cluster mode always gets a random number since we 
> disable the configuration spark.ui.port by setting this property to be 0 
> always. 
> There are some use cases where we need to run Web UI port on some specified 
> range because of some security concerns and unable to do so with the 
> configuration. 
>  
> [https://github.com/apache/spark/blob/842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L216]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32039) Unable to Set `spark.ui.port` configuration in Yarn Cluster Mode

2020-06-20 Thread rajat (Jira)
rajat created SPARK-32039:
-

 Summary: Unable to Set `spark.ui.port` configuration in Yarn 
Cluster Mode
 Key: SPARK-32039
 URL: https://issues.apache.org/jira/browse/SPARK-32039
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.4.0
Reporter: rajat


Spark Web UI port in Yarn cluster mode always gets a random number since we 
disable the configuration spark.ui.port by setting this property to be 0 
always. 

There are some use cases where we need to run Web UI port on some specified 
range because of some security concerns and unable to do so with the 
configuration. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32021) make_interval does not accept seconds >100

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141016#comment-17141016
 ] 

Apache Spark commented on SPARK-32021:
--

User 'MaxGekk' has created a pull request for this issue:
https://github.com/apache/spark/pull/28878

> make_interval does not accept seconds >100
> --
>
> Key: SPARK-32021
> URL: https://issues.apache.org/jira/browse/SPARK-32021
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Juliusz Sompolski
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.1.0
>
>
> In make_interval(years, months, weeks, days, hours, mins, secs), secs are 
> defined as Decimal(8, 6), which turns into null if the value of the 
> expression overflows 100 seconds.
> Larger seconds values should be allowed.
> This has been reported by Simba, who wants to use make_interval to implement 
> translation for TIMESTAMP_ADD ODBC function in Spark 3.0.
> ODBC {fn TIMESTAMPADD(SECOND, integer_exp, timestamp} fails when integer_exp 
> returns seconds values >= 100.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32021) make_interval does not accept seconds >100

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17141015#comment-17141015
 ] 

Apache Spark commented on SPARK-32021:
--

User 'MaxGekk' has created a pull request for this issue:
https://github.com/apache/spark/pull/28878

> make_interval does not accept seconds >100
> --
>
> Key: SPARK-32021
> URL: https://issues.apache.org/jira/browse/SPARK-32021
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Juliusz Sompolski
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 3.1.0
>
>
> In make_interval(years, months, weeks, days, hours, mins, secs), secs are 
> defined as Decimal(8, 6), which turns into null if the value of the 
> expression overflows 100 seconds.
> Larger seconds values should be allowed.
> This has been reported by Simba, who wants to use make_interval to implement 
> translation for TIMESTAMP_ADD ODBC function in Spark 3.0.
> ODBC {fn TIMESTAMPADD(SECOND, integer_exp, timestamp} fails when integer_exp 
> returns seconds values >= 100.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31980) Spark sequence() fails if start and end of range are identical dates

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140995#comment-17140995
 ] 

Apache Spark commented on SPARK-31980:
--

User 'TJX2014' has created a pull request for this issue:
https://github.com/apache/spark/pull/28877

> Spark sequence() fails if start and end of range are identical dates
> 
>
> Key: SPARK-31980
> URL: https://issues.apache.org/jira/browse/SPARK-31980
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6
> Environment: Spark 2.4.4 standalone and on AWS EMR
>Reporter: Dave DeCaprio
>Assignee: JinxinTang
>Priority: Minor
> Fix For: 3.0.1, 3.1.0, 2.4.7
>
>
>  
> The following Spark SQL query throws an exception
> {code:java}
> select sequence(cast("2011-03-01" as date), cast("2011-03-01" as date), 
> interval 1 month)
> {code}
> The error is:
>  
>  
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: 
> 1java.lang.ArrayIndexOutOfBoundsException: 1 at 
> scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:92) at 
> org.apache.spark.sql.catalyst.expressions.Sequence$TemporalSequenceImpl.eval(collectionOperations.scala:2681)
>  at 
> org.apache.spark.sql.catalyst.expressions.Sequence.eval(collectionOperations.scala:2514)
>  at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:389){noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32038:


Assignee: Apache Spark

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Assignee: Apache Spark
>Priority: Major
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32038:


Assignee: (was: Apache Spark)

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Major
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140987#comment-17140987
 ] 

Apache Spark commented on SPARK-32038:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/28876

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Major
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32038) Regression in handling NaN values in COUNT(DISTINCT)

2020-06-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140988#comment-17140988
 ] 

Apache Spark commented on SPARK-32038:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/28876

> Regression in handling NaN values in COUNT(DISTINCT)
> 
>
> Key: SPARK-32038
> URL: https://issues.apache.org/jira/browse/SPARK-32038
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
>Reporter: Mithun Radhakrishnan
>Priority: Major
>
> There seems to be a regression in Spark 3.0.0, with regard to how {{NaN}} 
> values are normalized/handled in {{COUNT(DISTINCT ...)}}. Here is an 
> illustration:
> {code:scala}
> case class Test( uid:String, score:Float)
> val POS_NAN_1 = java.lang.Float.intBitsToFloat(0x7f81)
> val POS_NAN_2 = java.lang.Float.intBitsToFloat(0x7fff)
> val rows = Seq(
>  Test("mithunr",  Float.NaN), 
>  Test("mithunr",  POS_NAN_1),
>  Test("mithunr",  POS_NAN_2),
>  Test("abellina", 1.0f),
>  Test("abellina", 2.0f)
> ).toDF.createOrReplaceTempView("mytable")
> spark.sql(" select uid, count(distinct score) from mytable group by 1 order 
> by 1 asc ").show
> {code}
> Here are the results under Spark 3.0.0:
> {code:java|title=Spark 3.0.0 (single aggregation)}
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|3|
> ++-+
> {code}
> Note that the count against {{mithunr}} is {{3}}, accounting for each 
> distinct value for {{NaN}}.
>  The right results are returned when another aggregation is added to the GBY:
> {code:scala|title=Spark 3.0.0 (multiple aggregations)}
> scala> spark.sql(" select uid, count(distinct score), max(score) from mytable 
> group by 1 order by 1 asc ").show
> ++-+--+
> | uid|count(DISTINCT score)|max(score)|
> ++-+--+
> |abellina|2|   2.0|
> | mithunr|1|   NaN|
> ++-+--+
> {code}
> Also, note that Spark 2.4.6 normalizes the {{DISTINCT}} expression correctly:
> {code:scala|title=Spark 2.4.6}
> scala> spark.sql(" select uid, count(distinct score) from mytable group by 1 
> order by 1 asc ").show
> ++-+
> | uid|count(DISTINCT score)|
> ++-+
> |abellina|2|
> | mithunr|1|
> ++-+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31956) Do not fail if there is no ambiguous self join

2020-06-20 Thread Xiao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-31956:

Fix Version/s: 3.1.0

> Do not fail if there is no ambiguous self join
> --
>
> Key: SPARK-31956
> URL: https://issues.apache.org/jira/browse/SPARK-31956
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.0.1, 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30876) Optimizer cannot infer from inferred constraints with join

2020-06-20 Thread Navin Viswanath (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140898#comment-17140898
 ] 

Navin Viswanath edited comment on SPARK-30876 at 6/20/20, 6:09 AM:
---

[~yumwang] would this be in the logical plan optimization? I was looking into 
the logical plans and got this for the following query:

 
{noformat}
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
val query = Join(x,
  Join(y, z, Inner,
  Some(("x.a".attr === "y.b".attr) &&
("y.b".attr === "z.c".attr) &&
("z.c".attr === 1)), JoinHint.NONE),
  Inner, None, JoinHint.NONE){noformat}
 

Unoptimized:
{noformat}
'Join Inner
:- SubqueryAlias x
:  +- LocalRelation , [a#0, b#1, c#2]
+- 'Join Inner, ((('x.a = 'y.b) AND ('y.b = 'z.c)) AND ('z.c = 1))
   :- SubqueryAlias y
   :  +- LocalRelation , [a#0, b#1, c#2]
   +- SubqueryAlias z
  +- LocalRelation , [a#0, b#1, c#2]{noformat}
Optimized:
{noformat}
'Join Inner
:- LocalRelation , [a#0, b#1, c#2]
+- 'Join Inner, ((('x.a = 'y.b) AND ('y.b = 'z.c)) AND ('z.c = 1))
   :- LocalRelation , [a#0, b#1, c#2]
   +- LocalRelation , [a#22, b#23, c#24]{noformat}
Or was this supposed to be in the physical plan? Any pointers would help. 
Thanks!

 


was (Author: navinvishy):
[~yumwang] would this be in the logical plan optimization? I was looking into 
the logical plans and got this for the following query:

 
{noformat}
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
val query = x.join(y).join(z)
  .where(("x.a".attr === "y.b".attr) && ("y.b".attr === "z.c".attr) && 
("z.c".attr === 1)){noformat}
 

Unoptimized:
{noformat}
'Filter ((('x.a = 'y.b) AND ('y.b = 'z.c)) AND ('z.c = 1))
+- 'Join Inner
 :- Join Inner
 : :- SubqueryAlias x
 : : +- LocalRelation , [a#0, b#1, c#2]
 : +- SubqueryAlias y
 : +- LocalRelation , [d#3]
 +- SubqueryAlias z
 +- LocalRelation , [a#0, b#1, c#2]{noformat}
Optimized:
{noformat}
'Filter ((('x.a = 'y.b) AND ('y.b = 'z.c)) AND ('z.c = 1))
+- 'Join Inner
 :- Join Inner
 : :- LocalRelation , [a#0, b#1, c#2]
 : +- LocalRelation , [d#3]
 +- LocalRelation , [a#0, b#1, c#2]{noformat}
Or was this supposed to be in the physical plan? Any pointers would help. 
Thanks!

 

> Optimizer cannot infer from inferred constraints with join
> --
>
> Key: SPARK-30876
> URL: https://issues.apache.org/jira/browse/SPARK-30876
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Yuming Wang
>Priority: Major
>
> How to reproduce this issue:
> {code:sql}
> create table t1(a int, b int, c int);
> create table t2(a int, b int, c int);
> create table t3(a int, b int, c int);
> select count(*) from t1 join t2 join t3 on (t1.a = t2.b and t2.b = t3.c and 
> t3.c = 1);
> {code}
> Spark 2.3+:
> {noformat}
> == Physical Plan ==
> *(4) HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition, true, [id=#102]
>+- *(3) HashAggregate(keys=[], functions=[partial_count(1)])
>   +- *(3) Project
>  +- *(3) BroadcastHashJoin [b#10], [c#14], Inner, BuildRight
> :- *(3) Project [b#10]
> :  +- *(3) BroadcastHashJoin [a#6], [b#10], Inner, BuildRight
> : :- *(3) Project [a#6]
> : :  +- *(3) Filter isnotnull(a#6)
> : : +- *(3) ColumnarToRow
> : :+- FileScan parquet default.t1[a#6] Batched: true, 
> DataFilters: [isnotnull(a#6)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
> struct
> : +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#87]
> :+- *(1) Project [b#10]
> :   +- *(1) Filter (isnotnull(b#10) AND (b#10 = 1))
> :  +- *(1) ColumnarToRow
> : +- FileScan parquet default.t2[b#10] Batched: 
> true, DataFilters: [isnotnull(b#10), (b#10 = 1)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(b), EqualTo(b,1)], 
> ReadSchema: struct
> +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#96]
>+- *(2) Project [c#14]
>   +- *(2) Filter (isnotnull(c#14) AND (c#14 = 1))
>  +- *(2) ColumnarToRow
> +- FileScan parquet default.t3[c#14] Batched: true, 
> DataFilters: [isnotnull(c#14), (c#14 = 1)],