[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646976#comment-17646976
 ] 

Apache Spark commented on SPARK-41506:
--

User 'zhengruifeng' has created a pull request for this issue:
https://github.com/apache/spark/pull/39060

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971
 ] 

wuyi edited comment on SPARK-41497 at 12/14/22 7:31 AM:


I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there are no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail;

 

If there are accumulators values changed after the rdd computation, then the 
rdd's cache should only be marked as usable/visible only when the task succeeds.

 

(let me think further and see if it's doable..)


was (Author: ngone51):
I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there're no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail.

 

(let me think further and see if it's doable..)

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: 

[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646973#comment-17646973
 ] 

Ohad Raviv commented on SPARK-41510:


ok.. after diving into the code I think I found what I was looking for:
{code:java}
spark._sc._python_includes.append("/shared_nfs/my_folder") {code}
but it is kind of hacky. tell me what you think, and if we could make it more 
official/documented.

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646972#comment-17646972
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

[~Ngone51] Agree, that is what I was not sure of.
I would expect this scenario (even without accumulator) to be fairly low 
frequency enough that the cost of extra recomputation might be fine.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971
 ] 

wuyi commented on SPARK-41497:
--

I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there're no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail.

 

(let me think further and see if it's doable..)

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646969#comment-17646969
 ] 

wuyi commented on SPARK-41497:
--

> do we have a way to do that ?

 

[~mridulm80]  Currently, we only have the mapping between the task and 
accumulators. Accumulators are registered to the task via TaskContext.get() 
when they deserialize at the executor.

If we could have a way to know which RDD scope the accumulator within when 
deserializing, we could set up the mapping between the RDD and accumulators 
then. This probably

be the most difficult part.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Comment Edited] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE

2022-12-13 Thread Jayadeep Jayaraman (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963
 ] 

Jayadeep Jayaraman edited comment on SPARK-38719 at 12/14/22 6:56 AM:
--

[~maxgekk] - I tried creating the failure as shown below, but somehow this 
specific error does not show up. Can you suggest what can be the issue ?
{code:java}
scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))scala> scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: 
null>]scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)
scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|{ABC, null}| null|
|  2|{MNO, null}| null|
|  3|{PQR, null}| null|
+---+---+-+{code}


was (Author: jjayadeep):
[~maxgekk] - I tried creating the failure as shown below, but somehow this 
specific error does not show up. Can you suggest what can be the issue ?
{code:java}
// scala> val null_data = Seq(
     |   (1, ("ABC",null,"value12")),
     |   (2, ("MNO",null,"value22")),
     |   (3, ("PQR",null,"value32"))
     |   )
null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), 
(2,(MNO,null,value22)), (3,(PQR,null,value32)))scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null 
... 1 more field>]scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))scala> scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: 
null>]scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)
scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|{ABC, null}| null|
|  2|{MNO, null}| null|
|  3|{PQR, null}| null|
+---+---+-+{code}

> Test the error class: CANNOT_CAST_DATATYPE
> --
>
> Key: SPARK-38719
> URL: https://issues.apache.org/jira/browse/SPARK-38719
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Add at least one test for the error class *CANNOT_CAST_DATATYPE* to 
> QueryExecutionErrorsSuite. The test should cover the exception throw in 
> QueryExecutionErrors:
> {code:scala}
>   def cannotCastFromNullTypeError(to: DataType): Throwable = {
> new SparkException(errorClass = "CANNOT_CAST_DATATYPE",
>   messageParameters = Array(NullType.typeName, to.typeName), null)
>   }
> {code}
> For example, here is a test for the error class *UNSUPPORTED_FEATURE*: 
> https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170
> +The test must have a check of:+
> # the entire error message
> # sqlState if it is defined in the error-classes.json file
> # the error class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE

2022-12-13 Thread Jayadeep Jayaraman (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963
 ] 

Jayadeep Jayaraman edited comment on SPARK-38719 at 12/14/22 6:56 AM:
--

[~maxgekk] - I tried creating the failure as shown below, but somehow this 
specific error does not show up. Can you suggest what can be the issue ?
{code:java}
// scala> val null_data = Seq(
     |   (1, ("ABC",null,"value12")),
     |   (2, ("MNO",null,"value22")),
     |   (3, ("PQR",null,"value32"))
     |   )
null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), 
(2,(MNO,null,value22)), (3,(PQR,null,value32)))scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null 
... 1 more field>]scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))scala> scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: 
null>]scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)
scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|{ABC, null}| null|
|  2|{MNO, null}| null|
|  3|{PQR, null}| null|
+---+---+-+{code}


was (Author: jjayadeep):
I tried creating the failure as shown below, but somehow this specific error 
does not show up. Can you suggest what can be the issue ?
{code:java}
// scala> val null_data = Seq(
     |   (1, ("ABC",null,"value12")),
     |   (2, ("MNO",null,"value22")),
     |   (3, ("PQR",null,"value32"))
     |   )
null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), 
(2,(MNO,null,value22)), (3,(PQR,null,value32)))scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null 
... 1 more field>]scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))scala> scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: 
null>]scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)
scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|{ABC, null}| null|
|  2|{MNO, null}| null|
|  3|{PQR, null}| null|
+---+---+-+{code}

> Test the error class: CANNOT_CAST_DATATYPE
> --
>
> Key: SPARK-38719
> URL: https://issues.apache.org/jira/browse/SPARK-38719
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Add at least one test for the error class *CANNOT_CAST_DATATYPE* to 
> QueryExecutionErrorsSuite. The test should cover the exception throw in 
> QueryExecutionErrors:
> {code:scala}
>   def cannotCastFromNullTypeError(to: DataType): Throwable = {
> new SparkException(errorClass = "CANNOT_CAST_DATATYPE",
>   messageParameters = Array(NullType.typeName, to.typeName), null)
>   }
> {code}
> For example, here is a test for the error class *UNSUPPORTED_FEATURE*: 
> https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170
> +The test must have a check of:+
> # the entire error message
> # sqlState if it is defined in the error-classes.json file
> # the error class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE

2022-12-13 Thread Jayadeep Jayaraman (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963
 ] 

Jayadeep Jayaraman edited comment on SPARK-38719 at 12/14/22 6:55 AM:
--

I tried creating the failure as shown below, but somehow this specific error 
does not show up. Can you suggest what can be the issue ?
{code:java}
// scala> val null_data = Seq(
     |   (1, ("ABC",null,"value12")),
     |   (2, ("MNO",null,"value22")),
     |   (3, ("PQR",null,"value32"))
     |   )
null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), 
(2,(MNO,null,value22)), (3,(PQR,null,value32)))scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null 
... 1 more field>]scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))scala> scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: 
null>]scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)
scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|{ABC, null}| null|
|  2|{MNO, null}| null|
|  3|{PQR, null}| null|
+---+---+-+{code}


was (Author: jjayadeep):
I tried creating the failure as shown below, but somehow this specific error 
does not show up. Can you suggest what can be the issue ?

```

scala> val null_data = Seq(
     |   (1, ("ABC",null,"value12")),
     |   (2, ("MNO",null,"value22")),
     |   (3, ("PQR",null,"value32"))
     |   )
null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), 
(2,(MNO,null,value22)), (3,(PQR,null,value32)))

scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null 
... 1 more field>]

scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))

scala> 

scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]

scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)


scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|\{ABC, null}| null|
|  2|\{MNO, null}| null|
|  3|\{PQR, null}| null|
+---+---+-+
```

> Test the error class: CANNOT_CAST_DATATYPE
> --
>
> Key: SPARK-38719
> URL: https://issues.apache.org/jira/browse/SPARK-38719
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Add at least one test for the error class *CANNOT_CAST_DATATYPE* to 
> QueryExecutionErrorsSuite. The test should cover the exception throw in 
> QueryExecutionErrors:
> {code:scala}
>   def cannotCastFromNullTypeError(to: DataType): Throwable = {
> new SparkException(errorClass = "CANNOT_CAST_DATATYPE",
>   messageParameters = Array(NullType.typeName, to.typeName), null)
>   }
> {code}
> For example, here is a test for the error class *UNSUPPORTED_FEATURE*: 
> https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170
> +The test must have a check of:+
> # the entire error message
> # sqlState if it is defined in the error-classes.json file
> # the error class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE

2022-12-13 Thread Jayadeep Jayaraman (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963
 ] 

Jayadeep Jayaraman commented on SPARK-38719:


I tried creating the failure as shown below, but somehow this specific error 
does not show up. Can you suggest what can be the issue ?

```

scala> val null_data = Seq(
     |   (1, ("ABC",null,"value12")),
     |   (2, ("MNO",null,"value22")),
     |   (3, ("PQR",null,"value32"))
     |   )
null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), 
(2,(MNO,null,value22)), (3,(PQR,null,value32)))

scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null 
... 1 more field>]

scala> val null_data = Seq(
     |   (1, ("ABC",null)),
     |   (2, ("MNO",null)),
     |   (3, ("PQR",null))
     |   )
null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), 
(3,(PQR,null)))

scala> 

scala>  val df = null_data.toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]

scala> df.printSchema()
root
 |-- _1: integer (nullable = false)
 |-- _2: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: null (nullable = true)


scala>  df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show()
+---+---+-+
| _1|         _2|_2._2|
+---+---+-+
|  1|\{ABC, null}| null|
|  2|\{MNO, null}| null|
|  3|\{PQR, null}| null|
+---+---+-+
```

> Test the error class: CANNOT_CAST_DATATYPE
> --
>
> Key: SPARK-38719
> URL: https://issues.apache.org/jira/browse/SPARK-38719
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Add at least one test for the error class *CANNOT_CAST_DATATYPE* to 
> QueryExecutionErrorsSuite. The test should cover the exception throw in 
> QueryExecutionErrors:
> {code:scala}
>   def cannotCastFromNullTypeError(to: DataType): Throwable = {
> new SparkException(errorClass = "CANNOT_CAST_DATATYPE",
>   messageParameters = Array(NullType.typeName, to.typeName), null)
>   }
> {code}
> For example, here is a test for the error class *UNSUPPORTED_FEATURE*: 
> https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170
> +The test must have a check of:+
> # the entire error message
> # sqlState if it is defined in the error-classes.json file
> # the error class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646962#comment-17646962
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

Agree, if we can determine that - do we have a way to do that ?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646960#comment-17646960
 ] 

Apache Spark commented on SPARK-41506:
--

User 'dengziming' has created a pull request for this issue:
https://github.com/apache/spark/pull/39059

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41515) PVC-oriented executor pod allocation

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-41515:
--
Labels: releasenotes  (was: )

> PVC-oriented executor pod allocation
> 
>
> Key: SPARK-41515
> URL: https://issues.apache.org/jira/browse/SPARK-41515
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Priority: Major
>  Labels: releasenotes
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39324) Log ExecutorDecommission as INFO level in TaskSchedulerImpl

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-39324:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Improvement)

> Log ExecutorDecommission as INFO level in TaskSchedulerImpl
> ---
>
> Key: SPARK-39324
> URL: https://issues.apache.org/jira/browse/SPARK-39324
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>
> Like the other module, `TaskSchedulerImpl` should log the decommission as 
> `INFO` level.
> {code}
> 22/05/28 01:25:28 INFO KubernetesClusterSchedulerBackend: Decommission 
> executors: 8
> 22/05/28 01:25:28 INFO KubernetesClusterSchedulerBackend: Notify executor 8 
> to decommissioning.
> 22/05/28 01:25:28 INFO BlockManagerMasterEndpoint: Mark BlockManagers 
> (BlockManagerId(8, 100.103.40.13, 43353, None)) as being decommissioning.
> 22/05/28 01:25:29 ERROR TaskSchedulerImpl: Lost executor 8 on 100.103.40.13: 
> Executor decommission.
> 22/05/28 01:25:29 INFO ExecutorMonitor: Executor 8 is removed. Remove reason 
> statistics: ...
> 22/05/28 01:25:29 INFO DAGScheduler: Executor lost: 8 (epoch 7)
> 22/05/28 01:25:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 
> 8 from BlockManagerMaster.
> 22/05/28 01:25:29 INFO BlockManagerMasterEndpoint: Removing block manager 
> BlockManagerId(8, 100.103.40.13, 43353, None)
> 22/05/28 01:25:29 INFO BlockManagerMaster: Removed 8 successfully in 
> removeExecutor
> 22/05/28 01:25:29 INFO DAGScheduler: Shuffle files lost for executor: 8 
> (epoch 7)
> 22/05/28 01:25:34 INFO BlockManagerMaster: Removal of executor 8 requested
> 22/05/28 01:25:34 INFO BlockManagerMasterEndpoint: Trying to remove executor 
> 8 from BlockManagerMaster.
> 22/05/28 01:25:34 INFO 
> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asked to remove 
> non-existent executor 8
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39450) Reuse PVCs by default

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-39450:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Improvement)

> Reuse PVCs by default
> -
>
> Key: SPARK-39450
> URL: https://issues.apache.org/jira/browse/SPARK-39450
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39688) getReusablePVCs should handle accounts with no PVC permission

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-39688:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Bug)

> getReusablePVCs should handle accounts with no PVC permission
> -
>
> Key: SPARK-39688
> URL: https://issues.apache.org/jira/browse/SPARK-39688
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39898) Upgrade kubernetes-client to 5.12.3

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-39898:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Bug)

> Upgrade kubernetes-client to 5.12.3
> ---
>
> Key: SPARK-39898
> URL: https://issues.apache.org/jira/browse/SPARK-39898
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build, Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39846) Enable spark.dynamicAllocation.shuffleTracking.enabled by default

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-39846:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Improvement)

> Enable spark.dynamicAllocation.shuffleTracking.enabled by default
> -
>
> Key: SPARK-39846
> URL: https://issues.apache.org/jira/browse/SPARK-39846
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>
> This issue aims to make Spark 3.4 will track shuffle data when dynamic 
> allocation is enabled without shuffle service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE

2022-12-13 Thread Max Gekk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646942#comment-17646942
 ] 

Max Gekk commented on SPARK-38719:
--

[~jjayadeep] Sure, go ahead.

> Test the error class: CANNOT_CAST_DATATYPE
> --
>
> Key: SPARK-38719
> URL: https://issues.apache.org/jira/browse/SPARK-38719
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Add at least one test for the error class *CANNOT_CAST_DATATYPE* to 
> QueryExecutionErrorsSuite. The test should cover the exception throw in 
> QueryExecutionErrors:
> {code:scala}
>   def cannotCastFromNullTypeError(to: DataType): Throwable = {
> new SparkException(errorClass = "CANNOT_CAST_DATATYPE",
>   messageParameters = Array(NullType.typeName, to.typeName), null)
>   }
> {code}
> For example, here is a test for the error class *UNSUPPORTED_FEATURE*: 
> https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170
> +The test must have a check of:+
> # the entire error message
> # sqlState if it is defined in the error-classes.json file
> # the error class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39965) Skip PVC cleanup when driver doesn't own PVCs

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-39965:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Bug)

> Skip PVC cleanup when driver doesn't own PVCs
> -
>
> Key: SPARK-39965
> URL: https://issues.apache.org/jira/browse/SPARK-39965
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 3.3.0
>Reporter: Pralabh Kumar
>Assignee: Pralabh Kumar
>Priority: Trivial
> Fix For: 3.3.1, 3.2.3, 3.4.0
>
>
> From Spark32 . as a part of [https://github.com/apache/spark/pull/32288] , 
> functionality is added to delete PVC if the Spark driver died. 
> [https://github.com/apache/spark/blob/786a70e710369b195d7c117b33fe9983044014d6/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L144]
>  
> However there are cases , where spark on K8s doesn't use PVC and use host 
> path for storage. 
> [https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-kubernetes-volumes]
>  
> Now  in those cases ,
>  * it request to delete PVC (which is not required) .
>  * It also tries to delete in the case where driver doesn't own the PV (or 
> spark.kubernetes.driver.ownPersistentVolumeClaim is false) 
>  * Moreover in the cluster , where Spark user doesn't have access to list or 
> delete PVC , it throws exception .  
>  
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> GET at: 
> [https://kubernetes.default.svc/api/v1/namespaces/<>/persistentvolumeclaims?labelSelector=spark-app-selector%3Dspark-332bd09284b3442f8a6a214fabcd6ab1|https://kubernetes.default.svc/api/v1/namespaces/dpi-dev/persistentvolumeclaims?labelSelector=spark-app-selector%3Dspark-332bd09284b3442f8a6a214fabcd6ab1].
>  Message: Forbidden!Configured service account doesn't have access. Service 
> account may have been revoked. persistentvolumeclaims is forbidden: User 
> "system:serviceaccount:dpi-dev:spark" cannot list resource 
> "persistentvolumeclaims" in API group "" in the namespace "<>".
>  
> *Solution*
> Ideally there should be configuration 
> spark.kubernetes.driver.pvc.deleteOnTermination or use 
> spark.kubernetes.driver.ownPersistentVolumeClaim  which should be checked 
> before calling to delete PVC. If user have not set up PV or if the driver 
> doesn't own  then there is no need to call the api and delete PVC . 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40198) Enable spark.storage.decommission.(rdd|shuffle)Blocks.enabled by default

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-40198:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Improvement)

> Enable spark.storage.decommission.(rdd|shuffle)Blocks.enabled by default
> 
>
> Key: SPARK-40198
> URL: https://issues.apache.org/jira/browse/SPARK-40198
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40304) Add decomTestTag to K8s Integration Test

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-40304:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Test)

> Add decomTestTag to K8s Integration Test
> 
>
> Key: SPARK-40304
> URL: https://issues.apache.org/jira/browse/SPARK-40304
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes, Tests
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Minor
> Fix For: 3.3.1, 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40459) recoverDiskStore should not stop by existing recomputed files

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-40459:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Bug)

> recoverDiskStore should not stop by existing recomputed files
> -
>
> Key: SPARK-40459
> URL: https://issues.apache.org/jira/browse/SPARK-40459
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 3.2.3, 3.3.2
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.3.1, 3.2.3, 3.4.0
>
>
> {code:java}
> org.apache.commons.io.FileExistsException: File element in parameter 'null' 
> already exists: '...'
>   at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587)
>   at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305)
>   at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283)
>   at 
> org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150)
>   at 
> org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487)
>   at 
> org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407)
>   at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
>   at 
> org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380)
>   at 
> org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490)
>   at 
> org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95)
>   at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>   at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
>   at 
> org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41388) getReusablePVCs should ignore recently created PVCs in the previous batch

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-41388:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Bug)

> getReusablePVCs should ignore recently created PVCs in the previous batch
> -
>
> Key: SPARK-41388
> URL: https://issues.apache.org/jira/browse/SPARK-41388
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 3.2.2, 3.3.1, 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.2.4, 3.3.2, 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-41514:
-

Assignee: Dongjoon Hyun

> Add `PVC-oriented executor pod allocation` section and revise config name
> -
>
> Key: SPARK-41514
> URL: https://issues.apache.org/jira/browse/SPARK-41514
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41410) Support PVC-oriented executor pod allocation

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-41410:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: New Feature)

> Support PVC-oriented executor pod allocation
> 
>
> Key: SPARK-41410
> URL: https://issues.apache.org/jira/browse/SPARK-41410
> Project: Spark
>  Issue Type: Sub-task
>  Components: Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-41514:
--
Parent: SPARK-41515
Issue Type: Sub-task  (was: Documentation)

> Add `PVC-oriented executor pod allocation` section and revise config name
> -
>
> Key: SPARK-41514
> URL: https://issues.apache.org/jira/browse/SPARK-41514
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41515) PVC-oriented executor pod allocation

2022-12-13 Thread Dongjoon Hyun (Jira)
Dongjoon Hyun created SPARK-41515:
-

 Summary: PVC-oriented executor pod allocation
 Key: SPARK-41515
 URL: https://issues.apache.org/jira/browse/SPARK-41515
 Project: Spark
  Issue Type: New Feature
  Components: Kubernetes
Affects Versions: 3.4.0
Reporter: Dongjoon Hyun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE

2022-12-13 Thread Jayadeep Jayaraman (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646938#comment-17646938
 ] 

Jayadeep Jayaraman commented on SPARK-38719:


Hi [~maxgekk] - I would like to work on this task. Let me know if that would be 
okay.

> Test the error class: CANNOT_CAST_DATATYPE
> --
>
> Key: SPARK-38719
> URL: https://issues.apache.org/jira/browse/SPARK-38719
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Add at least one test for the error class *CANNOT_CAST_DATATYPE* to 
> QueryExecutionErrorsSuite. The test should cover the exception throw in 
> QueryExecutionErrors:
> {code:scala}
>   def cannotCastFromNullTypeError(to: DataType): Throwable = {
> new SparkException(errorClass = "CANNOT_CAST_DATATYPE",
>   messageParameters = Array(NullType.typeName, to.typeName), null)
>   }
> {code}
> For example, here is a test for the error class *UNSUPPORTED_FEATURE*: 
> https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170
> +The test must have a check of:+
> # the entire error message
> # sqlState if it is defined in the error-classes.json file
> # the error class



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41248) Add config flag to control before of JSON partial results parsing in SPARK-40646

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk reassigned SPARK-41248:


Assignee: Ivan Sadikov

> Add config flag to control before of JSON partial results parsing in 
> SPARK-40646
> 
>
> Key: SPARK-41248
> URL: https://issues.apache.org/jira/browse/SPARK-41248
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Ivan Sadikov
>Assignee: Ivan Sadikov
>Priority: Major
> Attachments: json-benchmark-with-SPARK-40646.log, 
> json-benchmark-without-SPARK-40646.log
>
>
> This is a follow-up for https://issues.apache.org/jira/browse/SPARK-40646.
>  
> It was observed in internal benchmarks that the JSON partial results parsing 
> can be 30% slower compared to parsing without the patch. I could not find a 
> regression and the Apache Spark JSON benchmark results are very similar with 
> and without SPARK-40646.
> However, I would still like to add a config flag to enable/disable the 
> feature in the case the regression is observed in users' queries.
> Benchmark results are attached below.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41248) Add config flag to control before of JSON partial results parsing in SPARK-40646

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk resolved SPARK-41248.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38784
[https://github.com/apache/spark/pull/38784]

> Add config flag to control before of JSON partial results parsing in 
> SPARK-40646
> 
>
> Key: SPARK-41248
> URL: https://issues.apache.org/jira/browse/SPARK-41248
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Ivan Sadikov
>Assignee: Ivan Sadikov
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: json-benchmark-with-SPARK-40646.log, 
> json-benchmark-without-SPARK-40646.log
>
>
> This is a follow-up for https://issues.apache.org/jira/browse/SPARK-40646.
>  
> It was observed in internal benchmarks that the JSON partial results parsing 
> can be 30% slower compared to parsing without the patch. I could not find a 
> regression and the Apache Spark JSON benchmark results are very similar with 
> and without SPARK-40646.
> However, I would still like to add a config flag to enable/disable the 
> feature in the case the regression is observed in users' queries.
> Benchmark results are attached below.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41514:


Assignee: Apache Spark

> Add `PVC-oriented executor pod allocation` section and revise config name
> -
>
> Key: SPARK-41514
> URL: https://issues.apache.org/jira/browse/SPARK-41514
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41514:


Assignee: (was: Apache Spark)

> Add `PVC-oriented executor pod allocation` section and revise config name
> -
>
> Key: SPARK-41514
> URL: https://issues.apache.org/jira/browse/SPARK-41514
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646931#comment-17646931
 ] 

Apache Spark commented on SPARK-41514:
--

User 'dongjoon-hyun' has created a pull request for this issue:
https://github.com/apache/spark/pull/39058

> Add `PVC-oriented executor pod allocation` section and revise config name
> -
>
> Key: SPARK-41514
> URL: https://issues.apache.org/jira/browse/SPARK-41514
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, Kubernetes
>Affects Versions: 3.4.0
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name

2022-12-13 Thread Dongjoon Hyun (Jira)
Dongjoon Hyun created SPARK-41514:
-

 Summary: Add `PVC-oriented executor pod allocation` section and 
revise config name
 Key: SPARK-41514
 URL: https://issues.apache.org/jira/browse/SPARK-41514
 Project: Spark
  Issue Type: Documentation
  Components: Documentation, Kubernetes
Affects Versions: 3.4.0
Reporter: Dongjoon Hyun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41409) Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043`

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk reassigned SPARK-41409:


Assignee: Yang Jie

> Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043`
> ---
>
> Key: SPARK-41409
> URL: https://issues.apache.org/jira/browse/SPARK-41409
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41409) Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043`

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk resolved SPARK-41409.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38940
[https://github.com/apache/spark/pull/38940]

> Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043`
> ---
>
> Key: SPARK-41409
> URL: https://issues.apache.org/jira/browse/SPARK-41409
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646919#comment-17646919
 ] 

wuyi commented on SPARK-41497:
--

[~mridulm80]  For b) and c), shouldn't we allow T2 to use the result of T1's 
cache if rdd1's computation doesn't include any accumulators?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)

2022-12-13 Thread Rui Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-41512:
-
Description: 
h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit and the data size exceeds a limit). There is a possibility 
that the previous output partitions before go into this limit are sorted. The 
single partition shuffle approach has a correctness bug in this case: shuffle 
read partitions could be out of partition order and the limit exec just take 
the first limit rows which could lose the order thus result into wrong result. 
This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a 
native solution is to sort all the data fed into limit again, which is another 
overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

1. Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.
2. Avoid reading all shuffle data from mappers for this single partition 
shuffle to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.


Steps to solve this problem:
1. Add a per mapper row count collector
2. Add a AQE rule to use the row count metrics to decide dynamic how many 
shuffle data from mappers to read. 

  was:
h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit and the data size exceeds a limit). There is a possibility 
that the previous output partitions before go into this limit are sorted. The 
single partition shuffle approach has a correctness bug in this case: shuffle 
read partitions could be out of partition order and the limit exec just take 
the first limit rows which could lose the order thus result into wrong result. 
This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a 
native solution is to sort all the data fed into limit again, which is another 
overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.


Steps to solve this problem:
1. Add a per mapper row count collector
2. Add a AQE rule to use the row count metrics to decide dynamic how many 
shuffle data from mappers to read. 


> Row count based shuffle read to optimize global limit after a single 
> partition shuffle (optionally with input partition sorted)
> ---
>
> Key: SPARK-41512
> URL: https://issues.apache.org/jira/browse/SPARK-41512
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>

[jira] [Assigned] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41513:


Assignee: Rui Wang  (was: Apache Spark)

> Implement a Accumulator to collect per mapper row count metrics
> ---
>
> Key: SPARK-41513
> URL: https://issues.apache.org/jira/browse/SPARK-41513
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41513:


Assignee: Apache Spark  (was: Rui Wang)

> Implement a Accumulator to collect per mapper row count metrics
> ---
>
> Key: SPARK-41513
> URL: https://issues.apache.org/jira/browse/SPARK-41513
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646901#comment-17646901
 ] 

Apache Spark commented on SPARK-41513:
--

User 'amaliujia' has created a pull request for this issue:
https://github.com/apache/spark/pull/39057

> Implement a Accumulator to collect per mapper row count metrics
> ---
>
> Key: SPARK-41513
> URL: https://issues.apache.org/jira/browse/SPARK-41513
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)

2022-12-13 Thread Rui Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-41512:
-
Description: 
h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit and the data size exceeds a limit). There is a possibility 
that the previous output partitions before go into this limit are sorted. The 
single partition shuffle approach has a correctness bug in this case: shuffle 
read partitions could be out of partition order and the limit exec just take 
the first limit rows which could lose the order thus result into wrong result. 
This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a 
native solution is to sort all the data fed into limit again, which is another 
overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.


Steps to solve this problem:
1. Add a per mapper row count collector
2. Add a AQE rule to use the row count metrics to decide dynamic how many 
shuffle data from mappers to read. 

  was:
h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit). There is a possibility that the previous output 
partitions before go into this limit are sorted. The single partition shuffle 
approach has a correctness bug in this case: shuffle read partitions could be 
out of partition order and the limit exec just take the first limit rows which 
could lose the order thus result into wrong result. This is a shuffle so it is 
relatively costly. Meanwhile, to correct this bug, a native solution is to sort 
all the data fed into limit again, which is another overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.


Steps to solve this problem:
1. Add a per mapper row count collector
2. Add a AQE rule to use the row count metrics to decide dynamic how many 
shuffle data from mappers to read. 


> Row count based shuffle read to optimize global limit after a single 
> partition shuffle (optionally with input partition sorted)
> ---
>
> Key: SPARK-41512
> URL: https://issues.apache.org/jira/browse/SPARK-41512
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.4.0
>

[jira] [Updated] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)

2022-12-13 Thread Rui Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated SPARK-41512:
-
Description: 
h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit). There is a possibility that the previous output 
partitions before go into this limit are sorted. The single partition shuffle 
approach has a correctness bug in this case: shuffle read partitions could be 
out of partition order and the limit exec just take the first limit rows which 
could lose the order thus result into wrong result. This is a shuffle so it is 
relatively costly. Meanwhile, to correct this bug, a native solution is to sort 
all the data fed into limit again, which is another overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.


Steps to solve this problem:
1. Add a per mapper row count collector
2. Add a AQE rule to use the row count metrics to decide dynamic how many 
shuffle data from mappers to read. 

  was:
h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit). There is a possibility that the previous output 
partitions before go into this limit are sorted. The single partition shuffle 
approach has a correctness bug in this case: shuffle read partitions could be 
out of partition order and the limit exec just take the first limit rows which 
could lose the order thus result into wrong result. This is a shuffle so it is 
relatively costly. Meanwhile, to correct this bug, a native solution is to sort 
all the data fed into limit again, which is another overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.


> Row count based shuffle read to optimize global limit after a single 
> partition shuffle (optionally with input partition sorted)
> ---
>
> Key: SPARK-41512
> URL: https://issues.apache.org/jira/browse/SPARK-41512
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
>
> h3. Problem Statement
> In current Spark optimizer, a single partition shuffle might be created for a 
> limit if this limit is not the 

[jira] [Created] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics

2022-12-13 Thread Rui Wang (Jira)
Rui Wang created SPARK-41513:


 Summary: Implement a Accumulator to collect per mapper row count 
metrics
 Key: SPARK-41513
 URL: https://issues.apache.org/jira/browse/SPARK-41513
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.4.0
Reporter: Rui Wang
Assignee: Rui Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)

2022-12-13 Thread Rui Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646898#comment-17646898
 ] 

Rui Wang commented on SPARK-41512:
--

cc [~cloud_fan]

> Row count based shuffle read to optimize global limit after a single 
> partition shuffle (optionally with input partition sorted)
> ---
>
> Key: SPARK-41512
> URL: https://issues.apache.org/jira/browse/SPARK-41512
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
>
> h3. Problem Statement
> In current Spark optimizer, a single partition shuffle might be created for a 
> limit if this limit is not the last non-action operation (e.g. a filter 
> following the limit). There is a possibility that the previous output 
> partitions before go into this limit are sorted. The single partition shuffle 
> approach has a correctness bug in this case: shuffle read partitions could be 
> out of partition order and the limit exec just take the first limit rows 
> which could lose the order thus result into wrong result. This is a shuffle 
> so it is relatively costly. Meanwhile, to correct this bug, a native solution 
> is to sort all the data fed into limit again, which is another overhead. 
> h3. Proposed idea
> So we propose a row count based AQE algorithm that optimizes this problem by 
> two folds:
> Avoid the extra sort on the shuffle read side (or with the limit exec) to 
> achieve the correct result.
> Avoid reading all shuffle data from mappers for this single partition shuffle 
> to reduce shuffle cost.
> Note that 1. is only applied for the sorted partition case where 2. is 
> applied for general single partition shuffle + limit case
>  
> The algorithm works as the following: 
> 1. Each mapper will record a row count when writing shuffle data.
> 2. Since this is single shuffle partition case, there is only one partition 
> but N mappers.
> 3. A accumulatorV2 is implemented to collect a list of tuple which records 
> the mapping between mapper id and the number of row written by the mapper 
> (row count metrics)
> 4. AQE framework detects a plan shape of shuffle plus a global limit.
> 5. AQE framework reads necessary data from mappers based on the limit. For 
> example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
> limit is 500, AQE creates shuffle read node to write from both mapper 1 and 
> 2, thus skip the left mappers.
> 6. This is both correct for limit with the sorted or non-sorted partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)

2022-12-13 Thread Rui Wang (Jira)
Rui Wang created SPARK-41512:


 Summary: Row count based shuffle read to optimize global limit 
after a single partition shuffle (optionally with input partition sorted)
 Key: SPARK-41512
 URL: https://issues.apache.org/jira/browse/SPARK-41512
 Project: Spark
  Issue Type: Task
  Components: SQL
Affects Versions: 3.4.0
Reporter: Rui Wang
Assignee: Rui Wang


h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit). There is a possibility that the previous output 
partitions before go into this limit are sorted. The single partition shuffle 
approach has a correctness bug in this case: shuffle read partitions could be 
out of partition order and the limit exec just take the first limit rows which 
could lose the order thus result into wrong result. This is a shuffle so it is 
relatively costly. Meanwhile, to correct this bug, a native solution is to sort 
all the data fed into limit again, which is another overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646896#comment-17646896
 ] 

Apache Spark commented on SPARK-41506:
--

User 'zhengruifeng' has created a pull request for this issue:
https://github.com/apache/spark/pull/39056

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646895#comment-17646895
 ] 

Apache Spark commented on SPARK-41506:
--

User 'zhengruifeng' has created a pull request for this issue:
https://github.com/apache/spark/pull/39056

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646890#comment-17646890
 ] 

Apache Spark commented on SPARK-41506:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/39055

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646889#comment-17646889
 ] 

Apache Spark commented on SPARK-41506:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/39055

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-41506:


Assignee: Ruifeng Zheng

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41506) Refactor LiteralExpression to support DataType

2022-12-13 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-41506.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 39047
[https://github.com/apache/spark/pull/39047]

> Refactor LiteralExpression to support DataType
> --
>
> Key: SPARK-41506
> URL: https://issues.apache.org/jira/browse/SPARK-41506
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, PySpark
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
---

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
---

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).

Probably 'usable' is incorrect term - 'visible' might be better ? That is, is 
this block visible to others (outside of the generating task).

[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to 

[jira] [Commented] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646823#comment-17646823
 ] 

Apache Spark commented on SPARK-27561:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/39054

> Support "lateral column alias references" to allow column aliases to be used 
> within SELECT clauses
> --
>
> Key: SPARK-27561
> URL: https://issues.apache.org/jira/browse/SPARK-27561
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Josh Rosen
>Assignee: Xinyi Yu
>Priority: Major
> Fix For: 3.4.0
>
>
> Amazon Redshift has a feature called "lateral column alias references": 
> [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/].
>  Quoting from that blogpost:
> {quote}The support for lateral column alias reference enables you to write 
> queries without repeating the same expressions in the SELECT list. For 
> example, you can define the alias 'probability' and use it within the same 
> select statement:
> {code:java}
> select clicks / impressions as probability, round(100 * probability, 1) as 
> percentage from raw_data;
> {code}
> {quote}
> There's more information about this feature on 
> [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:]
> {quote}The benefit of the lateral alias reference is you don't need to repeat 
> the aliased expression when building more complex expressions in the same 
> target list. When Amazon Redshift parses this type of reference, it just 
> inlines the previously defined aliases. If there is a column with the same 
> name defined in the FROM clause as the previously aliased expression, the 
> column in the FROM clause takes priority. For example, in the above query if 
> there is a column named 'probability' in table raw_data, the 'probability' in 
> the second expression in the target list will refer to that column instead of 
> the alias name 'probability'.
> {quote}
> It would be nice if Spark supported this syntax. I don't think that this is 
> standard SQL, so it might be a good idea to research if other SQL databases 
> support similar syntax (and to see if they implement the same column 
> resolution strategy as Redshift).
> We should also consider whether this needs to be feature-flagged as part of a 
> specific SQL compatibility mode / dialect.
> One possibly-related existing ticket: SPARK-9338, which discusses the use of 
> SELECT aliases in GROUP BY expressions.
> /cc [~hvanhovell]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646824#comment-17646824
 ] 

Apache Spark commented on SPARK-27561:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/39054

> Support "lateral column alias references" to allow column aliases to be used 
> within SELECT clauses
> --
>
> Key: SPARK-27561
> URL: https://issues.apache.org/jira/browse/SPARK-27561
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Josh Rosen
>Assignee: Xinyi Yu
>Priority: Major
> Fix For: 3.4.0
>
>
> Amazon Redshift has a feature called "lateral column alias references": 
> [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/].
>  Quoting from that blogpost:
> {quote}The support for lateral column alias reference enables you to write 
> queries without repeating the same expressions in the SELECT list. For 
> example, you can define the alias 'probability' and use it within the same 
> select statement:
> {code:java}
> select clicks / impressions as probability, round(100 * probability, 1) as 
> percentage from raw_data;
> {code}
> {quote}
> There's more information about this feature on 
> [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:]
> {quote}The benefit of the lateral alias reference is you don't need to repeat 
> the aliased expression when building more complex expressions in the same 
> target list. When Amazon Redshift parses this type of reference, it just 
> inlines the previously defined aliases. If there is a column with the same 
> name defined in the FROM clause as the previously aliased expression, the 
> column in the FROM clause takes priority. For example, in the above query if 
> there is a column named 'probability' in table raw_data, the 'probability' in 
> the second expression in the target list will refer to that column instead of 
> the alias name 'probability'.
> {quote}
> It would be nice if Spark supported this syntax. I don't think that this is 
> standard SQL, so it might be a good idea to research if other SQL databases 
> support similar syntax (and to see if they implement the same column 
> resolution strategy as Redshift).
> We should also consider whether this needs to be feature-flagged as part of a 
> specific SQL compatibility mode / dialect.
> One possibly-related existing ticket: SPARK-9338, which discusses the use of 
> SELECT aliases in GROUP BY expressions.
> /cc [~hvanhovell]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41062) Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk reassigned SPARK-41062:


Assignee: Haejoon Lee

> Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE
> ---
>
> Key: SPARK-41062
> URL: https://issues.apache.org/jira/browse/SPARK-41062
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
>
> We should use clear and brief name for every error class.
> This sub-error class duplicates the main class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41062) Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk resolved SPARK-41062.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38576
[https://github.com/apache/spark/pull/38576]

> Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE
> ---
>
> Key: SPARK-41062
> URL: https://issues.apache.org/jira/browse/SPARK-41062
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
> Fix For: 3.4.0
>
>
> We should use clear and brief name for every error class.
> This sub-error class duplicates the main class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41482) Upgrade dropwizard metrics 4.2.13

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-41482.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 39026
[https://github.com/apache/spark/pull/39026]

> Upgrade dropwizard metrics 4.2.13
> -
>
> Key: SPARK-41482
> URL: https://issues.apache.org/jira/browse/SPARK-41482
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41482) Upgrade dropwizard metrics 4.2.13

2022-12-13 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-41482:
-

Assignee: Yang Jie

> Upgrade dropwizard metrics 4.2.13
> -
>
> Key: SPARK-41482
> URL: https://issues.apache.org/jira/browse/SPARK-41482
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646734#comment-17646734
 ] 

Ohad Raviv commented on SPARK-41510:


the conda solution is more for a "static" packages.

the scenario is this:

we're developing a python library (more than one .py file) and want to do it 
interactively in the notebooks. so we have all the modules in some folder and 
we add this folder to the driver's sys.path.

Then, if we for example use a function from the module inside a UDF, we get: 
"ModuleNotFoundError: No module named 'some_module' ".

The reason is that some_module is not in the PYTHONPATH/sys.path of the 
workers. the code itself is accessible to the workers for example in a shared 
NFS folder.

so all we now need is to add the path.

we can do it inside the UDF something like:

```

if "/shared_nfs/my_folder" not in sys.path: sys.path.insert(0, 
"/shared_nfs/my_folder")

```

but that is both very ugly and only a partial solution as it works only in UDF 
case.

the suggestion is to have some kind of mechanism to easily add a folder to the 
workers' sys.path.

the option of wrapping the code in zip/egg and add it makes a very long 
development cycle and requires restarting the spark session and the Notebook to 
lose its state.

with the suggestion above we could actually edit the python package 
interactively and see the changes almost immediately.

hope it is clearer now.

 

 

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses

2022-12-13 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-27561:
---

Assignee: Xinyi Yu

> Support "lateral column alias references" to allow column aliases to be used 
> within SELECT clauses
> --
>
> Key: SPARK-27561
> URL: https://issues.apache.org/jira/browse/SPARK-27561
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Josh Rosen
>Assignee: Xinyi Yu
>Priority: Major
> Fix For: 3.4.0
>
>
> Amazon Redshift has a feature called "lateral column alias references": 
> [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/].
>  Quoting from that blogpost:
> {quote}The support for lateral column alias reference enables you to write 
> queries without repeating the same expressions in the SELECT list. For 
> example, you can define the alias 'probability' and use it within the same 
> select statement:
> {code:java}
> select clicks / impressions as probability, round(100 * probability, 1) as 
> percentage from raw_data;
> {code}
> {quote}
> There's more information about this feature on 
> [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:]
> {quote}The benefit of the lateral alias reference is you don't need to repeat 
> the aliased expression when building more complex expressions in the same 
> target list. When Amazon Redshift parses this type of reference, it just 
> inlines the previously defined aliases. If there is a column with the same 
> name defined in the FROM clause as the previously aliased expression, the 
> column in the FROM clause takes priority. For example, in the above query if 
> there is a column named 'probability' in table raw_data, the 'probability' in 
> the second expression in the target list will refer to that column instead of 
> the alias name 'probability'.
> {quote}
> It would be nice if Spark supported this syntax. I don't think that this is 
> standard SQL, so it might be a good idea to research if other SQL databases 
> support similar syntax (and to see if they implement the same column 
> resolution strategy as Redshift).
> We should also consider whether this needs to be feature-flagged as part of a 
> specific SQL compatibility mode / dialect.
> One possibly-related existing ticket: SPARK-9338, which discusses the use of 
> SELECT aliases in GROUP BY expressions.
> /cc [~hvanhovell]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses

2022-12-13 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-27561.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38776
[https://github.com/apache/spark/pull/38776]

> Support "lateral column alias references" to allow column aliases to be used 
> within SELECT clauses
> --
>
> Key: SPARK-27561
> URL: https://issues.apache.org/jira/browse/SPARK-27561
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Josh Rosen
>Priority: Major
> Fix For: 3.4.0
>
>
> Amazon Redshift has a feature called "lateral column alias references": 
> [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/].
>  Quoting from that blogpost:
> {quote}The support for lateral column alias reference enables you to write 
> queries without repeating the same expressions in the SELECT list. For 
> example, you can define the alias 'probability' and use it within the same 
> select statement:
> {code:java}
> select clicks / impressions as probability, round(100 * probability, 1) as 
> percentage from raw_data;
> {code}
> {quote}
> There's more information about this feature on 
> [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:]
> {quote}The benefit of the lateral alias reference is you don't need to repeat 
> the aliased expression when building more complex expressions in the same 
> target list. When Amazon Redshift parses this type of reference, it just 
> inlines the previously defined aliases. If there is a column with the same 
> name defined in the FROM clause as the previously aliased expression, the 
> column in the FROM clause takes priority. For example, in the above query if 
> there is a column named 'probability' in table raw_data, the 'probability' in 
> the second expression in the target list will refer to that column instead of 
> the alias name 'probability'.
> {quote}
> It would be nice if Spark supported this syntax. I don't think that this is 
> standard SQL, so it might be a good idea to research if other SQL databases 
> support similar syntax (and to see if they implement the same column 
> resolution strategy as Redshift).
> We should also consider whether this needs to be feature-flagged as part of a 
> specific SQL compatibility mode / dialect.
> One possibly-related existing ticket: SPARK-9338, which discusses the use of 
> SELECT aliases in GROUP BY expressions.
> /cc [~hvanhovell]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-39601) AllocationFailure should not be treated as exitCausedByApp when driver is shutting down

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-39601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646682#comment-17646682
 ] 

Apache Spark commented on SPARK-39601:
--

User 'pan3793' has created a pull request for this issue:
https://github.com/apache/spark/pull/39053

> AllocationFailure should not be treated as exitCausedByApp when driver is 
> shutting down
> ---
>
> Key: SPARK-39601
> URL: https://issues.apache.org/jira/browse/SPARK-39601
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 3.3.0
>Reporter: Cheng Pan
>Assignee: Cheng Pan
>Priority: Major
> Fix For: 3.4.0
>
>
> I observed some Spark Applications successfully completed all jobs but failed 
> during the shutting down phase w/ reason: Max number of executor failures 
> (16) reached, the timeline is
> Driver - Job success, Spark starts shutting down procedure.
> {code:java}
> 2022-06-23 19:50:55 CST AbstractConnector INFO - Stopped 
> Spark@74e9431b{HTTP/1.1, (http/1.1)}
> {0.0.0.0:0}
> 2022-06-23 19:50:55 CST SparkUI INFO - Stopped Spark web UI at 
> http://hadoop2627.xxx.org:28446
> 2022-06-23 19:50:55 CST YarnClusterSchedulerBackend INFO - Shutting down all 
> executors
> {code}
> Driver - A container allocate successful during shutting down phase.
> {code:java}
> 2022-06-23 19:52:21 CST YarnAllocator INFO - Launching container 
> container_e94_1649986670278_7743380_02_25 on host hadoop4388.xxx.org for 
> executor with ID 24 for ResourceProfile Id 0{code}
> Executor - The executor can not connect to driver endpoint because driver 
> already stopped the endpoint.
> {code:java}
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393)
>   at 
> org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:81)
>   at 
> org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala)
> Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
>   at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>   at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:413)
>   at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
>   at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
>   at scala.collection.immutable.Range.foreach(Range.scala:158)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:411)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>   ... 4 more
> Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find 
> endpoint: spark://coarsegrainedschedu...@hadoop2627.xxx.org:21956
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144)
>   at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
>   at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>   at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
>   at 
> scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
>   at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288){code}
> Driver - YarnAllocator received container launch error message and treat it 
> as 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread huangtengfei (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646658#comment-17646658
 ] 

huangtengfei commented on SPARK-41497:
--

I also think that option3/4(include the improved proposal [~mridulm80] 
suggested) would be promising, the issue could be resolved either from the 
accumulator side or rdd cache side.
And option4 seems more straightforward since it's a complement to existing 
cache mechanism. And making decision based on task status could be a feasible 
solution. As mentioned above, the downside is that it may be overkill. If such 
cases are small probability events, maybe it is also acceptable.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: 

[jira] [Resolved] (SPARK-39601) AllocationFailure should not be treated as exitCausedByApp when driver is shutting down

2022-12-13 Thread Thomas Graves (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves resolved SPARK-39601.
---
Fix Version/s: 3.4.0
 Assignee: Cheng Pan
   Resolution: Fixed

> AllocationFailure should not be treated as exitCausedByApp when driver is 
> shutting down
> ---
>
> Key: SPARK-39601
> URL: https://issues.apache.org/jira/browse/SPARK-39601
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 3.3.0
>Reporter: Cheng Pan
>Assignee: Cheng Pan
>Priority: Major
> Fix For: 3.4.0
>
>
> I observed some Spark Applications successfully completed all jobs but failed 
> during the shutting down phase w/ reason: Max number of executor failures 
> (16) reached, the timeline is
> Driver - Job success, Spark starts shutting down procedure.
> {code:java}
> 2022-06-23 19:50:55 CST AbstractConnector INFO - Stopped 
> Spark@74e9431b{HTTP/1.1, (http/1.1)}
> {0.0.0.0:0}
> 2022-06-23 19:50:55 CST SparkUI INFO - Stopped Spark web UI at 
> http://hadoop2627.xxx.org:28446
> 2022-06-23 19:50:55 CST YarnClusterSchedulerBackend INFO - Shutting down all 
> executors
> {code}
> Driver - A container allocate successful during shutting down phase.
> {code:java}
> 2022-06-23 19:52:21 CST YarnAllocator INFO - Launching container 
> container_e94_1649986670278_7743380_02_25 on host hadoop4388.xxx.org for 
> executor with ID 24 for ResourceProfile Id 0{code}
> Executor - The executor can not connect to driver endpoint because driver 
> already stopped the endpoint.
> {code:java}
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393)
>   at 
> org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:81)
>   at 
> org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala)
> Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
>   at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>   at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:413)
>   at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
>   at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
>   at scala.collection.immutable.Range.foreach(Range.scala:158)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
>   at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:411)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>   ... 4 more
> Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find 
> endpoint: spark://coarsegrainedschedu...@hadoop2627.xxx.org:21956
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144)
>   at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
>   at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>   at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
>   at 
> scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
>   at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288){code}
> Driver - YarnAllocator received container launch error message and treat it 
> as `exitCausedByApp`
> {code:java}
> 2022-06-23 19:52:27 CST YarnAllocator 

[jira] [Resolved] (SPARK-41478) Assign a name to the error class _LEGACY_ERROR_TEMP_1234

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk resolved SPARK-41478.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 39018
[https://github.com/apache/spark/pull/39018]

> Assign a name to the error class _LEGACY_ERROR_TEMP_1234
> 
>
> Key: SPARK-41478
> URL: https://issues.apache.org/jira/browse/SPARK-41478
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41478) Assign a name to the error class _LEGACY_ERROR_TEMP_1234

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk reassigned SPARK-41478:


Assignee: BingKun Pan

> Assign a name to the error class _LEGACY_ERROR_TEMP_1234
> 
>
> Key: SPARK-41478
> URL: https://issues.apache.org/jira/browse/SPARK-41478
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646639#comment-17646639
 ] 

wuyi commented on SPARK-41497:
--

[~mridulm80] Sounds like a better idea than option 4. But I think it still 
doesn't work well for the case like:

For example, a task is constructed by `rdd1.cache().rdd2`. So if the task fails 
due to rdd2's computation, I think rdd1's cache should still be able to reuse.

 

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuyi updated SPARK-41497:
-
Description: 
Accumulator could be undercounted when the retried task has rdd cache.  See the 
example below and you could also find the completed and reproducible example at 
[https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]

  
{code:scala}
test("SPARK-XXX") {
  // Set up a cluster with 2 executors
  val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite")
  sc = new SparkContext(conf)
  // Set up a custom task scheduler. The scheduler will fail the first task 
attempt of the job
  // submitted below. In particular, the failed first attempt task would 
success on computation
  // (accumulator accounting, result caching) but only fail to report its 
success status due
  // to the concurrent executor lost. The second task attempt would success.
  taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
  val myAcc = sc.longAccumulator("myAcc")
  // Initiate a rdd with only one partition so there's only one task and 
specify the storage level
  // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
executors.
  val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
myAcc.add(100)
iter.map(x => x + 1)
  }.persist(StorageLevel.MEMORY_ONLY_2)
  // This will pass since the second task attempt will succeed
  assert(rdd.count() === 10)
  // This will fail due to `myAcc.add(100)` won't be executed during the second 
task attempt's
  // execution. Because the second task attempt will load the rdd cache 
directly instead of
  // executing the task function so `myAcc.add(100)` is skipped.
  assert(myAcc.value === 100)
} {code}
 

We could also hit this issue with decommission even if the rdd only has one 
copy. For example, decommission could migrate the rdd cache block to another 
executor (the result is actually the same with 2 copies) and the decommissioned 
executor lost before the task reports its success status to the driver. 

 

And the issue is a bit more complicated than expected to fix. I have tried to 
give some fixes but all of them are not ideal:

Option 1: Clean up any rdd cache related to the failed task: in practice, this 
option can already fix the issue in most cases. However, theoretically, rdd 
cache could be reported to the driver right after the driver cleans up the 
failed task's caches due to asynchronous communication. So this option can’t 
resolve the issue thoroughly;

Option 2: Disallow rdd cache reuse across the task attempts for the same task: 
this option can 100% fix the issue. The problem is this way can also affect the 
case where rdd cache can be reused across the attempts (e.g., when there is no 
accumulator operation in the task), which can have perf regression;

Option 3: Introduce accumulator cache: first, this requires a new framework for 
supporting accumulator cache; second, the driver should improve its logic to 
distinguish whether the accumulator cache value should be reported to the user 
to avoid overcounting. For example, in the case of task retry, the value should 
be reported. However, in the case of rdd cache reuse, the value shouldn’t be 
reported (should it?);

Option 4: Do task success validation when a task trying to load the rdd cache: 
this way defines a rdd cache is only valid/accessible if the task has 
succeeded. This way could be either overkill or a bit complex (because 
currently Spark would clean up the task state once it’s finished. So we need to 
maintain a structure to know if task once succeeded or not. )

  was:
Accumulator could be undercounted when the retried task has rdd cache.  See the 
example below and you could also find the completed and reproducible example at 
[https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]

  
{code:scala}
test("SPARK-XXX") {
  // Set up a cluster with 2 executors
  val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite")
  sc = new SparkContext(conf)
  // Set up a custom task scheduler. The scheduler will fail the first task 
attempt of the job
  // submitted below. In particular, the failed first attempt task would 
success on computation
  // (accumulator accounting, result caching) but only fail to report its 
success status due
  // to the concurrent executor lost. The second task attempt would success.
  taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
  val myAcc = sc.longAccumulator("myAcc")
  // Initiate a rdd with only one partition so there's only one task and 
specify the storage level
  // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
executors.
  val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
myAcc.add(100)
iter.map(x => x + 1)
  }.persist(StorageLevel.MEMORY_ONLY_2)
  // This will 

[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646620#comment-17646620
 ] 

Hyukjin Kwon commented on SPARK-41510:
--

What about using Conda 
([https://www.databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html)]
 or adding python files via --py-files? Would be great to elabourate the usage.

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646606#comment-17646606
 ] 

Ohad Raviv edited comment on SPARK-41510 at 12/13/22 12:23 PM:
---

[~hvanhovell], [~gurwls223] - can you please look at this/refer that to someone?


was (Author: uzadude):
[~hvanhovell] - can you please refer that to someone?

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646606#comment-17646606
 ] 

Ohad Raviv commented on SPARK-41510:


[~hvanhovell] - can you please refer that to someone?

> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646605#comment-17646605
 ] 

Apache Spark commented on SPARK-41360:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/39052

> Avoid BlockManager re-registration if the executor has been lost
> 
>
> Key: SPARK-41360
> URL: https://issues.apache.org/jira/browse/SPARK-41360
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.2.4, 3.3.2, 3.4.0
>
>
> We should avoid block manager re-registration if the executor has been lost 
> as it's meaningless and harmful, e.g., SPARK-35011



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646603#comment-17646603
 ] 

Apache Spark commented on SPARK-41360:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/39052

> Avoid BlockManager re-registration if the executor has been lost
> 
>
> Key: SPARK-41360
> URL: https://issues.apache.org/jira/browse/SPARK-41360
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.2.4, 3.3.2, 3.4.0
>
>
> We should avoid block manager re-registration if the executor has been lost 
> as it's meaningless and harmful, e.g., SPARK-35011



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41511:


Assignee: (was: Apache Spark)

> LongToUnsafeRowMap support ignoresDuplicatedKey
> ---
>
> Key: SPARK-41511
> URL: https://issues.apache.org/jira/browse/SPARK-41511
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: XiDuo You
>Priority: Major
>
> For left semi and left anti hash join, the duplicated keys of build side have 
> no meaning.
> Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We 
> can also optimize LongHashedRelation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ohad Raviv updated SPARK-41510:
---
Description: 
When working interactively with Spark through notebooks in various envs - 
Databricks/YARN I often encounter a very frustrating process of trying to add 
new python modules and even change their code without starting a new spark 
session/cluster.

In the driver side it is easy to add things like `sys.path.append()` but if for 
example, if a UDF code is importing a function from a local module, then the 
pickle boundaries will assume that the module exists in the workers, and fail 
on "python module does not exist..".

To update the code "online" I can add NFS volume to the workers' PYTHONPATH.

However, setting the PYTHONPATH in the workers is not easy as it gets 
overridden by someone (databricks/spark) along the way. a few ugly workarounds 
are suggested like running a "dummy" UDF on the workers to add the folder to 
the sys.path.

I think all of that could easily be solved if we just add a dedicated 
`spark.conf` the will get merged into the worker's PYTHONPATH, just here:

[https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]

 

please tell me what you think, and I will make the PR.

thanks.

 

 

  was:
When working interactively with Spark through notebooks in various envs - 
Databricks/YARN I often encounter a very frustrating process of trying to add 
new python modules and even change their code without starting a new spark 
session/cluster.

in the driver side it is easy to add things like `sys.path.append()` but if for 
example UDF code is importing function from a local module, then the pickle 
boundaries will assume that the module exists in the workers. and then I fail 
on "python module does not exist..".

adding NFS volumes to the workers PYTHONPATH could solve it, but it requires 
restarting the session/cluster and worse doesn't work in all envs as the 
PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few 
ugly work around are suggested like running a "dummy" udf on workers to add the 
folder to the sys.path.

I think all of that could easily be solved if we add a spark.conf to add to the 
worker PYTHONPATH. here:

[https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]

 

please tell me what you think, and I will make the PR.

thanks.

 

 


> Support easy way for user defined PYTHONPATH in workers
> ---
>
> Key: SPARK-41510
> URL: https://issues.apache.org/jira/browse/SPARK-41510
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Ohad Raviv
>Priority: Minor
>
> When working interactively with Spark through notebooks in various envs - 
> Databricks/YARN I often encounter a very frustrating process of trying to add 
> new python modules and even change their code without starting a new spark 
> session/cluster.
> In the driver side it is easy to add things like `sys.path.append()` but if 
> for example, if a UDF code is importing a function from a local module, then 
> the pickle boundaries will assume that the module exists in the workers, and 
> fail on "python module does not exist..".
> To update the code "online" I can add NFS volume to the workers' PYTHONPATH.
> However, setting the PYTHONPATH in the workers is not easy as it gets 
> overridden by someone (databricks/spark) along the way. a few ugly 
> workarounds are suggested like running a "dummy" UDF on the workers to add 
> the folder to the sys.path.
> I think all of that could easily be solved if we just add a dedicated 
> `spark.conf` the will get merged into the worker's PYTHONPATH, just here:
> [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]
>  
> please tell me what you think, and I will make the PR.
> thanks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646600#comment-17646600
 ] 

Apache Spark commented on SPARK-41511:
--

User 'ulysses-you' has created a pull request for this issue:
https://github.com/apache/spark/pull/39051

> LongToUnsafeRowMap support ignoresDuplicatedKey
> ---
>
> Key: SPARK-41511
> URL: https://issues.apache.org/jira/browse/SPARK-41511
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: XiDuo You
>Priority: Major
>
> For left semi and left anti hash join, the duplicated keys of build side have 
> no meaning.
> Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We 
> can also optimize LongHashedRelation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41511:


Assignee: Apache Spark

> LongToUnsafeRowMap support ignoresDuplicatedKey
> ---
>
> Key: SPARK-41511
> URL: https://issues.apache.org/jira/browse/SPARK-41511
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: XiDuo You
>Assignee: Apache Spark
>Priority: Major
>
> For left semi and left anti hash join, the duplicated keys of build side have 
> no meaning.
> Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We 
> can also optimize LongHashedRelation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers

2022-12-13 Thread Ohad Raviv (Jira)
Ohad Raviv created SPARK-41510:
--

 Summary: Support easy way for user defined PYTHONPATH in workers
 Key: SPARK-41510
 URL: https://issues.apache.org/jira/browse/SPARK-41510
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.1
Reporter: Ohad Raviv


When working interactively with Spark through notebooks in various envs - 
Databricks/YARN I often encounter a very frustrating process of trying to add 
new python modules and even change their code without starting a new spark 
session/cluster.

in the driver side it is easy to add things like `sys.path.append()` but if for 
example UDF code is importing function from a local module, then the pickle 
boundaries will assume that the module exists in the workers. and then I fail 
on "python module does not exist..".

adding NFS volumes to the workers PYTHONPATH could solve it, but it requires 
restarting the session/cluster and worse doesn't work in all envs as the 
PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few 
ugly work around are suggested like running a "dummy" udf on workers to add the 
folder to the sys.path.

I think all of that could easily be solved if we add a spark.conf to add to the 
worker PYTHONPATH. here:

[https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94]

 

please tell me what you think, and I will make the PR.

thanks.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey

2022-12-13 Thread XiDuo You (Jira)
XiDuo You created SPARK-41511:
-

 Summary: LongToUnsafeRowMap support ignoresDuplicatedKey
 Key: SPARK-41511
 URL: https://issues.apache.org/jira/browse/SPARK-41511
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.0
Reporter: XiDuo You


For left semi and left anti hash join, the duplicated keys of build side have 
no meaning.

Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We can 
also optimize LongHashedRelation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41412) Implement `Cast`

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646594#comment-17646594
 ] 

Apache Spark commented on SPARK-41412:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/39050

> Implement `Cast`
> 
>
> Key: SPARK-41412
> URL: https://issues.apache.org/jira/browse/SPARK-41412
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41412) Implement `Cast`

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646593#comment-17646593
 ] 

Apache Spark commented on SPARK-41412:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/39050

> Implement `Cast`
> 
>
> Key: SPARK-41412
> URL: https://issues.apache.org/jira/browse/SPARK-41412
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Assignee: Rui Wang
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.

2022-12-13 Thread jiaan.geng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiaan.geng updated SPARK-41509:
---
Description: 
Currently, Spark runtime filter supports bloom filter and in subquery filter.
The in subquery filter always execute Murmur3Hash before aggregate the join key.

Because the data size before aggregate will lager than after, we can delay 
execute Murmur3Hash until after aggregation for semi-join runtime filter and it 
will reduce the number of calls to Murmur3Hash and improve performance.

> Delay execution hash until after aggregation for semi-join runtime filter.
> --
>
> Key: SPARK-41509
> URL: https://issues.apache.org/jira/browse/SPARK-41509
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: jiaan.geng
>Priority: Major
>
> Currently, Spark runtime filter supports bloom filter and in subquery filter.
> The in subquery filter always execute Murmur3Hash before aggregate the join 
> key.
> Because the data size before aggregate will lager than after, we can delay 
> execute Murmur3Hash until after aggregation for semi-join runtime filter and 
> it will reduce the number of calls to Murmur3Hash and improve performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646581#comment-17646581
 ] 

Apache Spark commented on SPARK-41509:
--

User 'beliefer' has created a pull request for this issue:
https://github.com/apache/spark/pull/39049

> Delay execution hash until after aggregation for semi-join runtime filter.
> --
>
> Key: SPARK-41509
> URL: https://issues.apache.org/jira/browse/SPARK-41509
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: jiaan.geng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41509:


Assignee: (was: Apache Spark)

> Delay execution hash until after aggregation for semi-join runtime filter.
> --
>
> Key: SPARK-41509
> URL: https://issues.apache.org/jira/browse/SPARK-41509
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: jiaan.geng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41509:


Assignee: Apache Spark

> Delay execution hash until after aggregation for semi-join runtime filter.
> --
>
> Key: SPARK-41509
> URL: https://issues.apache.org/jira/browse/SPARK-41509
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: jiaan.geng
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646579#comment-17646579
 ] 

Apache Spark commented on SPARK-41509:
--

User 'beliefer' has created a pull request for this issue:
https://github.com/apache/spark/pull/39049

> Delay execution hash until after aggregation for semi-join runtime filter.
> --
>
> Key: SPARK-41509
> URL: https://issues.apache.org/jira/browse/SPARK-41509
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: jiaan.geng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.

2022-12-13 Thread jiaan.geng (Jira)
jiaan.geng created SPARK-41509:
--

 Summary: Delay execution hash until after aggregation for 
semi-join runtime filter.
 Key: SPARK-41509
 URL: https://issues.apache.org/jira/browse/SPARK-41509
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: jiaan.geng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41319) when-otherwise support

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646575#comment-17646575
 ] 

Apache Spark commented on SPARK-41319:
--

User 'zhengruifeng' has created a pull request for this issue:
https://github.com/apache/spark/pull/38956

> when-otherwise support
> --
>
> Key: SPARK-41319
> URL: https://issues.apache.org/jira/browse/SPARK-41319
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> 1, add protobuf message for expression 'CaseWhen';
> 2, support the 'Column.\{when, otherwise\}' methods in Spark Connect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41319) when-otherwise support

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646574#comment-17646574
 ] 

Apache Spark commented on SPARK-41319:
--

User 'zhengruifeng' has created a pull request for this issue:
https://github.com/apache/spark/pull/38956

> when-otherwise support
> --
>
> Key: SPARK-41319
> URL: https://issues.apache.org/jira/browse/SPARK-41319
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> 1, add protobuf message for expression 'CaseWhen';
> 2, support the 'Column.\{when, otherwise\}' methods in Spark Connect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41508) assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable

2022-12-13 Thread Yang Jie (Jira)
Yang Jie created SPARK-41508:


 Summary:  assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the 
existing SparkThrowable
 Key: SPARK-41508
 URL: https://issues.apache.org/jira/browse/SPARK-41508
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core, SQL
Affects Versions: 3.4.0
Reporter: Yang Jie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41508) Assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable

2022-12-13 Thread Yang Jie (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Jie updated SPARK-41508:
-
Summary:  Assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing 
SparkThrowable  (was:  assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the 
existing SparkThrowable)

>  Assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable
> --
>
> Key: SPARK-41508
> URL: https://issues.apache.org/jira/browse/SPARK-41508
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41406) Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk reassigned SPARK-41406:


Assignee: BingKun Pan

> Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
> -
>
> Key: SPARK-41406
> URL: https://issues.apache.org/jira/browse/SPARK-41406
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41406) Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic

2022-12-13 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk resolved SPARK-41406.
--
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38937
[https://github.com/apache/spark/pull/38937]

> Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
> -
>
> Key: SPARK-41406
> URL: https://issues.apache.org/jira/browse/SPARK-41406
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41507) Correct group of collection_funcs

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646509#comment-17646509
 ] 

Apache Spark commented on SPARK-41507:
--

User 'LuciferYang' has created a pull request for this issue:
https://github.com/apache/spark/pull/39045

> Correct group of collection_funcs
> -
>
> Key: SPARK-41507
> URL: https://issues.apache.org/jira/browse/SPARK-41507
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41424) Protobuf serializer for TaskDataWrapper

2022-12-13 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646508#comment-17646508
 ] 

Apache Spark commented on SPARK-41424:
--

User 'gengliangwang' has created a pull request for this issue:
https://github.com/apache/spark/pull/39048

> Protobuf serializer for TaskDataWrapper
> ---
>
> Key: SPARK-41424
> URL: https://issues.apache.org/jira/browse/SPARK-41424
> Project: Spark
>  Issue Type: Sub-task
>  Components: Web UI
>Affects Versions: 3.4.0
>Reporter: Gengliang Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41507) Correct group of collection_funcs

2022-12-13 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-41507:


Assignee: (was: Apache Spark)

> Correct group of collection_funcs
> -
>
> Key: SPARK-41507
> URL: https://issues.apache.org/jira/browse/SPARK-41507
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >