[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646976#comment-17646976 ] Apache Spark commented on SPARK-41506: -- User 'zhengruifeng' has created a pull request for this issue: https://github.com/apache/spark/pull/39060 > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971 ] wuyi edited comment on SPARK-41497 at 12/14/22 7:31 AM: I'm thinking if we could improve the improved Option 4 by changing the rdd cache reuse condition a bit: if there are no accumulators (external only probably) values changed after the rdd computation, then the rdd's cache should be marked as usable/visible no matter whether the task succeeds or fail; If there are accumulators values changed after the rdd computation, then the rdd's cache should only be marked as usable/visible only when the task succeeds. (let me think further and see if it's doable..) was (Author: ngone51): I'm thinking if we could improve the improved Option 4 by changing the rdd cache reuse condition a bit: if there're no accumulators (external only probably) values changed after the rdd computation, then the rdd's cache should be marked as usable/visible no matter whether the task succeeds or fail. (let me think further and see if it's doable..) > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache:
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646973#comment-17646973 ] Ohad Raviv commented on SPARK-41510: ok.. after diving into the code I think I found what I was looking for: {code:java} spark._sc._python_includes.append("/shared_nfs/my_folder") {code} but it is kind of hacky. tell me what you think, and if we could make it more official/documented. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646972#comment-17646972 ] Mridul Muralidharan commented on SPARK-41497: - [~Ngone51] Agree, that is what I was not sure of. I would expect this scenario (even without accumulator) to be fairly low frequency enough that the cost of extra recomputation might be fine. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971 ] wuyi commented on SPARK-41497: -- I'm thinking if we could improve the improved Option 4 by changing the rdd cache reuse condition a bit: if there're no accumulators (external only probably) values changed after the rdd computation, then the rdd's cache should be marked as usable/visible no matter whether the task succeeds or fail. (let me think further and see if it's doable..) > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646969#comment-17646969 ] wuyi commented on SPARK-41497: -- > do we have a way to do that ? [~mridulm80] Currently, we only have the mapping between the task and accumulators. Accumulators are registered to the task via TaskContext.get() when they deserialize at the executor. If we could have a way to know which RDD scope the accumulator within when deserializing, we could set up the mapping between the RDD and accumulators then. This probably be the most difficult part. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail:
[jira] [Comment Edited] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE
[ https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963 ] Jayadeep Jayaraman edited comment on SPARK-38719 at 12/14/22 6:56 AM: -- [~maxgekk] - I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? {code:java} scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null)))scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|{ABC, null}| null| | 2|{MNO, null}| null| | 3|{PQR, null}| null| +---+---+-+{code} was (Author: jjayadeep): [~maxgekk] - I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? {code:java} // scala> val null_data = Seq( | (1, ("ABC",null,"value12")), | (2, ("MNO",null,"value22")), | (3, ("PQR",null,"value32")) | ) null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), (2,(MNO,null,value22)), (3,(PQR,null,value32)))scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null ... 1 more field>]scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null)))scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|{ABC, null}| null| | 2|{MNO, null}| null| | 3|{PQR, null}| null| +---+---+-+{code} > Test the error class: CANNOT_CAST_DATATYPE > -- > > Key: SPARK-38719 > URL: https://issues.apache.org/jira/browse/SPARK-38719 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Add at least one test for the error class *CANNOT_CAST_DATATYPE* to > QueryExecutionErrorsSuite. The test should cover the exception throw in > QueryExecutionErrors: > {code:scala} > def cannotCastFromNullTypeError(to: DataType): Throwable = { > new SparkException(errorClass = "CANNOT_CAST_DATATYPE", > messageParameters = Array(NullType.typeName, to.typeName), null) > } > {code} > For example, here is a test for the error class *UNSUPPORTED_FEATURE*: > https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170 > +The test must have a check of:+ > # the entire error message > # sqlState if it is defined in the error-classes.json file > # the error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE
[ https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963 ] Jayadeep Jayaraman edited comment on SPARK-38719 at 12/14/22 6:56 AM: -- [~maxgekk] - I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? {code:java} // scala> val null_data = Seq( | (1, ("ABC",null,"value12")), | (2, ("MNO",null,"value22")), | (3, ("PQR",null,"value32")) | ) null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), (2,(MNO,null,value22)), (3,(PQR,null,value32)))scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null ... 1 more field>]scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null)))scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|{ABC, null}| null| | 2|{MNO, null}| null| | 3|{PQR, null}| null| +---+---+-+{code} was (Author: jjayadeep): I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? {code:java} // scala> val null_data = Seq( | (1, ("ABC",null,"value12")), | (2, ("MNO",null,"value22")), | (3, ("PQR",null,"value32")) | ) null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), (2,(MNO,null,value22)), (3,(PQR,null,value32)))scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null ... 1 more field>]scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null)))scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|{ABC, null}| null| | 2|{MNO, null}| null| | 3|{PQR, null}| null| +---+---+-+{code} > Test the error class: CANNOT_CAST_DATATYPE > -- > > Key: SPARK-38719 > URL: https://issues.apache.org/jira/browse/SPARK-38719 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Add at least one test for the error class *CANNOT_CAST_DATATYPE* to > QueryExecutionErrorsSuite. The test should cover the exception throw in > QueryExecutionErrors: > {code:scala} > def cannotCastFromNullTypeError(to: DataType): Throwable = { > new SparkException(errorClass = "CANNOT_CAST_DATATYPE", > messageParameters = Array(NullType.typeName, to.typeName), null) > } > {code} > For example, here is a test for the error class *UNSUPPORTED_FEATURE*: > https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170 > +The test must have a check of:+ > # the entire error message > # sqlState if it is defined in the error-classes.json file > # the error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE
[ https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963 ] Jayadeep Jayaraman edited comment on SPARK-38719 at 12/14/22 6:55 AM: -- I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? {code:java} // scala> val null_data = Seq( | (1, ("ABC",null,"value12")), | (2, ("MNO",null,"value22")), | (3, ("PQR",null,"value32")) | ) null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), (2,(MNO,null,value22)), (3,(PQR,null,value32)))scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null ... 1 more field>]scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null)))scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>]scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|{ABC, null}| null| | 2|{MNO, null}| null| | 3|{PQR, null}| null| +---+---+-+{code} was (Author: jjayadeep): I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? ``` scala> val null_data = Seq( | (1, ("ABC",null,"value12")), | (2, ("MNO",null,"value22")), | (3, ("PQR",null,"value32")) | ) null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), (2,(MNO,null,value22)), (3,(PQR,null,value32))) scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null ... 1 more field>] scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null))) scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>] scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|\{ABC, null}| null| | 2|\{MNO, null}| null| | 3|\{PQR, null}| null| +---+---+-+ ``` > Test the error class: CANNOT_CAST_DATATYPE > -- > > Key: SPARK-38719 > URL: https://issues.apache.org/jira/browse/SPARK-38719 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Add at least one test for the error class *CANNOT_CAST_DATATYPE* to > QueryExecutionErrorsSuite. The test should cover the exception throw in > QueryExecutionErrors: > {code:scala} > def cannotCastFromNullTypeError(to: DataType): Throwable = { > new SparkException(errorClass = "CANNOT_CAST_DATATYPE", > messageParameters = Array(NullType.typeName, to.typeName), null) > } > {code} > For example, here is a test for the error class *UNSUPPORTED_FEATURE*: > https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170 > +The test must have a check of:+ > # the entire error message > # sqlState if it is defined in the error-classes.json file > # the error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE
[ https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646963#comment-17646963 ] Jayadeep Jayaraman commented on SPARK-38719: I tried creating the failure as shown below, but somehow this specific error does not show up. Can you suggest what can be the issue ? ``` scala> val null_data = Seq( | (1, ("ABC",null,"value12")), | (2, ("MNO",null,"value22")), | (3, ("PQR",null,"value32")) | ) null_data: Seq[(Int, (String, Null, String))] = List((1,(ABC,null,value12)), (2,(MNO,null,value22)), (3,(PQR,null,value32))) scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null ... 1 more field>] scala> val null_data = Seq( | (1, ("ABC",null)), | (2, ("MNO",null)), | (3, ("PQR",null)) | ) null_data: Seq[(Int, (String, Null))] = List((1,(ABC,null)), (2,(MNO,null)), (3,(PQR,null))) scala> scala> val df = null_data.toDF() df: org.apache.spark.sql.DataFrame = [_1: int, _2: struct<_1: string, _2: null>] scala> df.printSchema() root |-- _1: integer (nullable = false) |-- _2: struct (nullable = true) | |-- _1: string (nullable = true) | |-- _2: null (nullable = true) scala> df.withColumn("_2._2",col("_2._2").cast(IntegerType)).show() +---+---+-+ | _1| _2|_2._2| +---+---+-+ | 1|\{ABC, null}| null| | 2|\{MNO, null}| null| | 3|\{PQR, null}| null| +---+---+-+ ``` > Test the error class: CANNOT_CAST_DATATYPE > -- > > Key: SPARK-38719 > URL: https://issues.apache.org/jira/browse/SPARK-38719 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Add at least one test for the error class *CANNOT_CAST_DATATYPE* to > QueryExecutionErrorsSuite. The test should cover the exception throw in > QueryExecutionErrors: > {code:scala} > def cannotCastFromNullTypeError(to: DataType): Throwable = { > new SparkException(errorClass = "CANNOT_CAST_DATATYPE", > messageParameters = Array(NullType.typeName, to.typeName), null) > } > {code} > For example, here is a test for the error class *UNSUPPORTED_FEATURE*: > https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170 > +The test must have a check of:+ > # the entire error message > # sqlState if it is defined in the error-classes.json file > # the error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646962#comment-17646962 ] Mridul Muralidharan commented on SPARK-41497: - Agree, if we can determine that - do we have a way to do that ? > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646960#comment-17646960 ] Apache Spark commented on SPARK-41506: -- User 'dengziming' has created a pull request for this issue: https://github.com/apache/spark/pull/39059 > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41515) PVC-oriented executor pod allocation
[ https://issues.apache.org/jira/browse/SPARK-41515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-41515: -- Labels: releasenotes (was: ) > PVC-oriented executor pod allocation > > > Key: SPARK-41515 > URL: https://issues.apache.org/jira/browse/SPARK-41515 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Priority: Major > Labels: releasenotes > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39324) Log ExecutorDecommission as INFO level in TaskSchedulerImpl
[ https://issues.apache.org/jira/browse/SPARK-39324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-39324: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Improvement) > Log ExecutorDecommission as INFO level in TaskSchedulerImpl > --- > > Key: SPARK-39324 > URL: https://issues.apache.org/jira/browse/SPARK-39324 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > > Like the other module, `TaskSchedulerImpl` should log the decommission as > `INFO` level. > {code} > 22/05/28 01:25:28 INFO KubernetesClusterSchedulerBackend: Decommission > executors: 8 > 22/05/28 01:25:28 INFO KubernetesClusterSchedulerBackend: Notify executor 8 > to decommissioning. > 22/05/28 01:25:28 INFO BlockManagerMasterEndpoint: Mark BlockManagers > (BlockManagerId(8, 100.103.40.13, 43353, None)) as being decommissioning. > 22/05/28 01:25:29 ERROR TaskSchedulerImpl: Lost executor 8 on 100.103.40.13: > Executor decommission. > 22/05/28 01:25:29 INFO ExecutorMonitor: Executor 8 is removed. Remove reason > statistics: ... > 22/05/28 01:25:29 INFO DAGScheduler: Executor lost: 8 (epoch 7) > 22/05/28 01:25:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 8 from BlockManagerMaster. > 22/05/28 01:25:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(8, 100.103.40.13, 43353, None) > 22/05/28 01:25:29 INFO BlockManagerMaster: Removed 8 successfully in > removeExecutor > 22/05/28 01:25:29 INFO DAGScheduler: Shuffle files lost for executor: 8 > (epoch 7) > 22/05/28 01:25:34 INFO BlockManagerMaster: Removal of executor 8 requested > 22/05/28 01:25:34 INFO BlockManagerMasterEndpoint: Trying to remove executor > 8 from BlockManagerMaster. > 22/05/28 01:25:34 INFO > KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asked to remove > non-existent executor 8 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39450) Reuse PVCs by default
[ https://issues.apache.org/jira/browse/SPARK-39450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-39450: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Improvement) > Reuse PVCs by default > - > > Key: SPARK-39450 > URL: https://issues.apache.org/jira/browse/SPARK-39450 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39688) getReusablePVCs should handle accounts with no PVC permission
[ https://issues.apache.org/jira/browse/SPARK-39688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-39688: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Bug) > getReusablePVCs should handle accounts with no PVC permission > - > > Key: SPARK-39688 > URL: https://issues.apache.org/jira/browse/SPARK-39688 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39898) Upgrade kubernetes-client to 5.12.3
[ https://issues.apache.org/jira/browse/SPARK-39898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-39898: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Bug) > Upgrade kubernetes-client to 5.12.3 > --- > > Key: SPARK-39898 > URL: https://issues.apache.org/jira/browse/SPARK-39898 > Project: Spark > Issue Type: Sub-task > Components: Build, Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39846) Enable spark.dynamicAllocation.shuffleTracking.enabled by default
[ https://issues.apache.org/jira/browse/SPARK-39846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-39846: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Improvement) > Enable spark.dynamicAllocation.shuffleTracking.enabled by default > - > > Key: SPARK-39846 > URL: https://issues.apache.org/jira/browse/SPARK-39846 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > > This issue aims to make Spark 3.4 will track shuffle data when dynamic > allocation is enabled without shuffle service. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE
[ https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646942#comment-17646942 ] Max Gekk commented on SPARK-38719: -- [~jjayadeep] Sure, go ahead. > Test the error class: CANNOT_CAST_DATATYPE > -- > > Key: SPARK-38719 > URL: https://issues.apache.org/jira/browse/SPARK-38719 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Add at least one test for the error class *CANNOT_CAST_DATATYPE* to > QueryExecutionErrorsSuite. The test should cover the exception throw in > QueryExecutionErrors: > {code:scala} > def cannotCastFromNullTypeError(to: DataType): Throwable = { > new SparkException(errorClass = "CANNOT_CAST_DATATYPE", > messageParameters = Array(NullType.typeName, to.typeName), null) > } > {code} > For example, here is a test for the error class *UNSUPPORTED_FEATURE*: > https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170 > +The test must have a check of:+ > # the entire error message > # sqlState if it is defined in the error-classes.json file > # the error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39965) Skip PVC cleanup when driver doesn't own PVCs
[ https://issues.apache.org/jira/browse/SPARK-39965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-39965: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Bug) > Skip PVC cleanup when driver doesn't own PVCs > - > > Key: SPARK-39965 > URL: https://issues.apache.org/jira/browse/SPARK-39965 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 3.3.0 >Reporter: Pralabh Kumar >Assignee: Pralabh Kumar >Priority: Trivial > Fix For: 3.3.1, 3.2.3, 3.4.0 > > > From Spark32 . as a part of [https://github.com/apache/spark/pull/32288] , > functionality is added to delete PVC if the Spark driver died. > [https://github.com/apache/spark/blob/786a70e710369b195d7c117b33fe9983044014d6/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala#L144] > > However there are cases , where spark on K8s doesn't use PVC and use host > path for storage. > [https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-kubernetes-volumes] > > Now in those cases , > * it request to delete PVC (which is not required) . > * It also tries to delete in the case where driver doesn't own the PV (or > spark.kubernetes.driver.ownPersistentVolumeClaim is false) > * Moreover in the cluster , where Spark user doesn't have access to list or > delete PVC , it throws exception . > > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > GET at: > [https://kubernetes.default.svc/api/v1/namespaces/<>/persistentvolumeclaims?labelSelector=spark-app-selector%3Dspark-332bd09284b3442f8a6a214fabcd6ab1|https://kubernetes.default.svc/api/v1/namespaces/dpi-dev/persistentvolumeclaims?labelSelector=spark-app-selector%3Dspark-332bd09284b3442f8a6a214fabcd6ab1]. > Message: Forbidden!Configured service account doesn't have access. Service > account may have been revoked. persistentvolumeclaims is forbidden: User > "system:serviceaccount:dpi-dev:spark" cannot list resource > "persistentvolumeclaims" in API group "" in the namespace "<>". > > *Solution* > Ideally there should be configuration > spark.kubernetes.driver.pvc.deleteOnTermination or use > spark.kubernetes.driver.ownPersistentVolumeClaim which should be checked > before calling to delete PVC. If user have not set up PV or if the driver > doesn't own then there is no need to call the api and delete PVC . > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40198) Enable spark.storage.decommission.(rdd|shuffle)Blocks.enabled by default
[ https://issues.apache.org/jira/browse/SPARK-40198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-40198: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Improvement) > Enable spark.storage.decommission.(rdd|shuffle)Blocks.enabled by default > > > Key: SPARK-40198 > URL: https://issues.apache.org/jira/browse/SPARK-40198 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40304) Add decomTestTag to K8s Integration Test
[ https://issues.apache.org/jira/browse/SPARK-40304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-40304: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Test) > Add decomTestTag to K8s Integration Test > > > Key: SPARK-40304 > URL: https://issues.apache.org/jira/browse/SPARK-40304 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes, Tests >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Minor > Fix For: 3.3.1, 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40459) recoverDiskStore should not stop by existing recomputed files
[ https://issues.apache.org/jira/browse/SPARK-40459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-40459: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Bug) > recoverDiskStore should not stop by existing recomputed files > - > > Key: SPARK-40459 > URL: https://issues.apache.org/jira/browse/SPARK-40459 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 3.2.3, 3.3.2 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.3.1, 3.2.3, 3.4.0 > > > {code:java} > org.apache.commons.io.FileExistsException: File element in parameter 'null' > already exists: '...' > at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587) > at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305) > at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283) > at > org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150) > at > org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487) > at > org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445) > at > org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380) > at > org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490) > at > org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at > org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41388) getReusablePVCs should ignore recently created PVCs in the previous batch
[ https://issues.apache.org/jira/browse/SPARK-41388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-41388: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Bug) > getReusablePVCs should ignore recently created PVCs in the previous batch > - > > Key: SPARK-41388 > URL: https://issues.apache.org/jira/browse/SPARK-41388 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 3.2.2, 3.3.1, 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.2.4, 3.3.2, 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name
[ https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-41514: - Assignee: Dongjoon Hyun > Add `PVC-oriented executor pod allocation` section and revise config name > - > > Key: SPARK-41514 > URL: https://issues.apache.org/jira/browse/SPARK-41514 > Project: Spark > Issue Type: Sub-task > Components: Documentation, Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41410) Support PVC-oriented executor pod allocation
[ https://issues.apache.org/jira/browse/SPARK-41410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-41410: -- Parent: SPARK-41515 Issue Type: Sub-task (was: New Feature) > Support PVC-oriented executor pod allocation > > > Key: SPARK-41410 > URL: https://issues.apache.org/jira/browse/SPARK-41410 > Project: Spark > Issue Type: Sub-task > Components: Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name
[ https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-41514: -- Parent: SPARK-41515 Issue Type: Sub-task (was: Documentation) > Add `PVC-oriented executor pod allocation` section and revise config name > - > > Key: SPARK-41514 > URL: https://issues.apache.org/jira/browse/SPARK-41514 > Project: Spark > Issue Type: Sub-task > Components: Documentation, Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41515) PVC-oriented executor pod allocation
Dongjoon Hyun created SPARK-41515: - Summary: PVC-oriented executor pod allocation Key: SPARK-41515 URL: https://issues.apache.org/jira/browse/SPARK-41515 Project: Spark Issue Type: New Feature Components: Kubernetes Affects Versions: 3.4.0 Reporter: Dongjoon Hyun -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38719) Test the error class: CANNOT_CAST_DATATYPE
[ https://issues.apache.org/jira/browse/SPARK-38719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646938#comment-17646938 ] Jayadeep Jayaraman commented on SPARK-38719: Hi [~maxgekk] - I would like to work on this task. Let me know if that would be okay. > Test the error class: CANNOT_CAST_DATATYPE > -- > > Key: SPARK-38719 > URL: https://issues.apache.org/jira/browse/SPARK-38719 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Max Gekk >Priority: Minor > Labels: starter > > Add at least one test for the error class *CANNOT_CAST_DATATYPE* to > QueryExecutionErrorsSuite. The test should cover the exception throw in > QueryExecutionErrors: > {code:scala} > def cannotCastFromNullTypeError(to: DataType): Throwable = { > new SparkException(errorClass = "CANNOT_CAST_DATATYPE", > messageParameters = Array(NullType.typeName, to.typeName), null) > } > {code} > For example, here is a test for the error class *UNSUPPORTED_FEATURE*: > https://github.com/apache/spark/blob/34e3029a43d2a8241f70f2343be8285cb7f231b9/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala#L151-L170 > +The test must have a check of:+ > # the entire error message > # sqlState if it is defined in the error-classes.json file > # the error class -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41248) Add config flag to control before of JSON partial results parsing in SPARK-40646
[ https://issues.apache.org/jira/browse/SPARK-41248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk reassigned SPARK-41248: Assignee: Ivan Sadikov > Add config flag to control before of JSON partial results parsing in > SPARK-40646 > > > Key: SPARK-41248 > URL: https://issues.apache.org/jira/browse/SPARK-41248 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Ivan Sadikov >Assignee: Ivan Sadikov >Priority: Major > Attachments: json-benchmark-with-SPARK-40646.log, > json-benchmark-without-SPARK-40646.log > > > This is a follow-up for https://issues.apache.org/jira/browse/SPARK-40646. > > It was observed in internal benchmarks that the JSON partial results parsing > can be 30% slower compared to parsing without the patch. I could not find a > regression and the Apache Spark JSON benchmark results are very similar with > and without SPARK-40646. > However, I would still like to add a config flag to enable/disable the > feature in the case the regression is observed in users' queries. > Benchmark results are attached below. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41248) Add config flag to control before of JSON partial results parsing in SPARK-40646
[ https://issues.apache.org/jira/browse/SPARK-41248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-41248. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 38784 [https://github.com/apache/spark/pull/38784] > Add config flag to control before of JSON partial results parsing in > SPARK-40646 > > > Key: SPARK-41248 > URL: https://issues.apache.org/jira/browse/SPARK-41248 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Ivan Sadikov >Assignee: Ivan Sadikov >Priority: Major > Fix For: 3.4.0 > > Attachments: json-benchmark-with-SPARK-40646.log, > json-benchmark-without-SPARK-40646.log > > > This is a follow-up for https://issues.apache.org/jira/browse/SPARK-40646. > > It was observed in internal benchmarks that the JSON partial results parsing > can be 30% slower compared to parsing without the patch. I could not find a > regression and the Apache Spark JSON benchmark results are very similar with > and without SPARK-40646. > However, I would still like to add a config flag to enable/disable the > feature in the case the regression is observed in users' queries. > Benchmark results are attached below. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name
[ https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41514: Assignee: Apache Spark > Add `PVC-oriented executor pod allocation` section and revise config name > - > > Key: SPARK-41514 > URL: https://issues.apache.org/jira/browse/SPARK-41514 > Project: Spark > Issue Type: Documentation > Components: Documentation, Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name
[ https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41514: Assignee: (was: Apache Spark) > Add `PVC-oriented executor pod allocation` section and revise config name > - > > Key: SPARK-41514 > URL: https://issues.apache.org/jira/browse/SPARK-41514 > Project: Spark > Issue Type: Documentation > Components: Documentation, Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name
[ https://issues.apache.org/jira/browse/SPARK-41514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646931#comment-17646931 ] Apache Spark commented on SPARK-41514: -- User 'dongjoon-hyun' has created a pull request for this issue: https://github.com/apache/spark/pull/39058 > Add `PVC-oriented executor pod allocation` section and revise config name > - > > Key: SPARK-41514 > URL: https://issues.apache.org/jira/browse/SPARK-41514 > Project: Spark > Issue Type: Documentation > Components: Documentation, Kubernetes >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41514) Add `PVC-oriented executor pod allocation` section and revise config name
Dongjoon Hyun created SPARK-41514: - Summary: Add `PVC-oriented executor pod allocation` section and revise config name Key: SPARK-41514 URL: https://issues.apache.org/jira/browse/SPARK-41514 Project: Spark Issue Type: Documentation Components: Documentation, Kubernetes Affects Versions: 3.4.0 Reporter: Dongjoon Hyun -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41409) Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043`
[ https://issues.apache.org/jira/browse/SPARK-41409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk reassigned SPARK-41409: Assignee: Yang Jie > Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043` > --- > > Key: SPARK-41409 > URL: https://issues.apache.org/jira/browse/SPARK-41409 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Yang Jie >Assignee: Yang Jie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41409) Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043`
[ https://issues.apache.org/jira/browse/SPARK-41409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-41409. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 38940 [https://github.com/apache/spark/pull/38940] > Reuse `WRONG_NUM_ARGS` instead of `_LEGACY_ERROR_TEMP_1043` > --- > > Key: SPARK-41409 > URL: https://issues.apache.org/jira/browse/SPARK-41409 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Yang Jie >Assignee: Yang Jie >Priority: Minor > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646919#comment-17646919 ] wuyi commented on SPARK-41497: -- [~mridulm80] For b) and c), shouldn't we allow T2 to use the result of T1's cache if rdd1's computation doesn't include any accumulators? > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)
[ https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang updated SPARK-41512: - Description: h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit and the data size exceeds a limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: 1. Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. 2. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. Steps to solve this problem: 1. Add a per mapper row count collector 2. Add a AQE rule to use the row count metrics to decide dynamic how many shuffle data from mappers to read. was: h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit and the data size exceeds a limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. Steps to solve this problem: 1. Add a per mapper row count collector 2. Add a AQE rule to use the row count metrics to decide dynamic how many shuffle data from mappers to read. > Row count based shuffle read to optimize global limit after a single > partition shuffle (optionally with input partition sorted) > --- > > Key: SPARK-41512 > URL: https://issues.apache.org/jira/browse/SPARK-41512 > Project: Spark > Issue Type: Task > Components: SQL >
[jira] [Assigned] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics
[ https://issues.apache.org/jira/browse/SPARK-41513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41513: Assignee: Rui Wang (was: Apache Spark) > Implement a Accumulator to collect per mapper row count metrics > --- > > Key: SPARK-41513 > URL: https://issues.apache.org/jira/browse/SPARK-41513 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics
[ https://issues.apache.org/jira/browse/SPARK-41513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41513: Assignee: Apache Spark (was: Rui Wang) > Implement a Accumulator to collect per mapper row count metrics > --- > > Key: SPARK-41513 > URL: https://issues.apache.org/jira/browse/SPARK-41513 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics
[ https://issues.apache.org/jira/browse/SPARK-41513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646901#comment-17646901 ] Apache Spark commented on SPARK-41513: -- User 'amaliujia' has created a pull request for this issue: https://github.com/apache/spark/pull/39057 > Implement a Accumulator to collect per mapper row count metrics > --- > > Key: SPARK-41513 > URL: https://issues.apache.org/jira/browse/SPARK-41513 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)
[ https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang updated SPARK-41512: - Description: h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit and the data size exceeds a limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. Steps to solve this problem: 1. Add a per mapper row count collector 2. Add a AQE rule to use the row count metrics to decide dynamic how many shuffle data from mappers to read. was: h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. Steps to solve this problem: 1. Add a per mapper row count collector 2. Add a AQE rule to use the row count metrics to decide dynamic how many shuffle data from mappers to read. > Row count based shuffle read to optimize global limit after a single > partition shuffle (optionally with input partition sorted) > --- > > Key: SPARK-41512 > URL: https://issues.apache.org/jira/browse/SPARK-41512 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.4.0 >
[jira] [Updated] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)
[ https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang updated SPARK-41512: - Description: h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. Steps to solve this problem: 1. Add a per mapper row count collector 2. Add a AQE rule to use the row count metrics to decide dynamic how many shuffle data from mappers to read. was: h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. > Row count based shuffle read to optimize global limit after a single > partition shuffle (optionally with input partition sorted) > --- > > Key: SPARK-41512 > URL: https://issues.apache.org/jira/browse/SPARK-41512 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > > h3. Problem Statement > In current Spark optimizer, a single partition shuffle might be created for a > limit if this limit is not the
[jira] [Created] (SPARK-41513) Implement a Accumulator to collect per mapper row count metrics
Rui Wang created SPARK-41513: Summary: Implement a Accumulator to collect per mapper row count metrics Key: SPARK-41513 URL: https://issues.apache.org/jira/browse/SPARK-41513 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.4.0 Reporter: Rui Wang Assignee: Rui Wang -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)
[ https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646898#comment-17646898 ] Rui Wang commented on SPARK-41512: -- cc [~cloud_fan] > Row count based shuffle read to optimize global limit after a single > partition shuffle (optionally with input partition sorted) > --- > > Key: SPARK-41512 > URL: https://issues.apache.org/jira/browse/SPARK-41512 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > > h3. Problem Statement > In current Spark optimizer, a single partition shuffle might be created for a > limit if this limit is not the last non-action operation (e.g. a filter > following the limit). There is a possibility that the previous output > partitions before go into this limit are sorted. The single partition shuffle > approach has a correctness bug in this case: shuffle read partitions could be > out of partition order and the limit exec just take the first limit rows > which could lose the order thus result into wrong result. This is a shuffle > so it is relatively costly. Meanwhile, to correct this bug, a native solution > is to sort all the data fed into limit again, which is another overhead. > h3. Proposed idea > So we propose a row count based AQE algorithm that optimizes this problem by > two folds: > Avoid the extra sort on the shuffle read side (or with the limit exec) to > achieve the correct result. > Avoid reading all shuffle data from mappers for this single partition shuffle > to reduce shuffle cost. > Note that 1. is only applied for the sorted partition case where 2. is > applied for general single partition shuffle + limit case > > The algorithm works as the following: > 1. Each mapper will record a row count when writing shuffle data. > 2. Since this is single shuffle partition case, there is only one partition > but N mappers. > 3. A accumulatorV2 is implemented to collect a list of tuple which records > the mapping between mapper id and the number of row written by the mapper > (row count metrics) > 4. AQE framework detects a plan shape of shuffle plus a global limit. > 5. AQE framework reads necessary data from mappers based on the limit. For > example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the > limit is 500, AQE creates shuffle read node to write from both mapper 1 and > 2, thus skip the left mappers. > 6. This is both correct for limit with the sorted or non-sorted partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)
Rui Wang created SPARK-41512: Summary: Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted) Key: SPARK-41512 URL: https://issues.apache.org/jira/browse/SPARK-41512 Project: Spark Issue Type: Task Components: SQL Affects Versions: 3.4.0 Reporter: Rui Wang Assignee: Rui Wang h3. Problem Statement In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. h3. Proposed idea So we propose a row count based AQE algorithm that optimizes this problem by two folds: Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost. Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case The algorithm works as the following: 1. Each mapper will record a row count when writing shuffle data. 2. Since this is single shuffle partition case, there is only one partition but N mappers. 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics) 4. AQE framework detects a plan shape of shuffle plus a global limit. 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers. 6. This is both correct for limit with the sorted or non-sorted partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646896#comment-17646896 ] Apache Spark commented on SPARK-41506: -- User 'zhengruifeng' has created a pull request for this issue: https://github.com/apache/spark/pull/39056 > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646895#comment-17646895 ] Apache Spark commented on SPARK-41506: -- User 'zhengruifeng' has created a pull request for this issue: https://github.com/apache/spark/pull/39056 > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646890#comment-17646890 ] Apache Spark commented on SPARK-41506: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/39055 > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646889#comment-17646889 ] Apache Spark commented on SPARK-41506: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/39055 > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-41506: Assignee: Ruifeng Zheng > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41506) Refactor LiteralExpression to support DataType
[ https://issues.apache.org/jira/browse/SPARK-41506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-41506. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 39047 [https://github.com/apache/spark/pull/39047] > Refactor LiteralExpression to support DataType > -- > > Key: SPARK-41506 > URL: https://issues.apache.org/jira/browse/SPARK-41506 > Project: Spark > Issue Type: Sub-task > Components: Connect, PySpark >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825 ] Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM: --- > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that local read iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. was (Author: mridulm80): > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in
[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825 ] Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM: --- > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that local read iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). Probably 'usable' is incorrect term - 'visible' might be better ? That is, is this block visible to others (outside of the generating task). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. was (Author: mridulm80): > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that local read iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825 ] Mridul Muralidharan commented on SPARK-41497: - > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to
[jira] [Commented] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses
[ https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646823#comment-17646823 ] Apache Spark commented on SPARK-27561: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/39054 > Support "lateral column alias references" to allow column aliases to be used > within SELECT clauses > -- > > Key: SPARK-27561 > URL: https://issues.apache.org/jira/browse/SPARK-27561 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Josh Rosen >Assignee: Xinyi Yu >Priority: Major > Fix For: 3.4.0 > > > Amazon Redshift has a feature called "lateral column alias references": > [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/]. > Quoting from that blogpost: > {quote}The support for lateral column alias reference enables you to write > queries without repeating the same expressions in the SELECT list. For > example, you can define the alias 'probability' and use it within the same > select statement: > {code:java} > select clicks / impressions as probability, round(100 * probability, 1) as > percentage from raw_data; > {code} > {quote} > There's more information about this feature on > [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:] > {quote}The benefit of the lateral alias reference is you don't need to repeat > the aliased expression when building more complex expressions in the same > target list. When Amazon Redshift parses this type of reference, it just > inlines the previously defined aliases. If there is a column with the same > name defined in the FROM clause as the previously aliased expression, the > column in the FROM clause takes priority. For example, in the above query if > there is a column named 'probability' in table raw_data, the 'probability' in > the second expression in the target list will refer to that column instead of > the alias name 'probability'. > {quote} > It would be nice if Spark supported this syntax. I don't think that this is > standard SQL, so it might be a good idea to research if other SQL databases > support similar syntax (and to see if they implement the same column > resolution strategy as Redshift). > We should also consider whether this needs to be feature-flagged as part of a > specific SQL compatibility mode / dialect. > One possibly-related existing ticket: SPARK-9338, which discusses the use of > SELECT aliases in GROUP BY expressions. > /cc [~hvanhovell] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses
[ https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646824#comment-17646824 ] Apache Spark commented on SPARK-27561: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/39054 > Support "lateral column alias references" to allow column aliases to be used > within SELECT clauses > -- > > Key: SPARK-27561 > URL: https://issues.apache.org/jira/browse/SPARK-27561 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Josh Rosen >Assignee: Xinyi Yu >Priority: Major > Fix For: 3.4.0 > > > Amazon Redshift has a feature called "lateral column alias references": > [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/]. > Quoting from that blogpost: > {quote}The support for lateral column alias reference enables you to write > queries without repeating the same expressions in the SELECT list. For > example, you can define the alias 'probability' and use it within the same > select statement: > {code:java} > select clicks / impressions as probability, round(100 * probability, 1) as > percentage from raw_data; > {code} > {quote} > There's more information about this feature on > [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:] > {quote}The benefit of the lateral alias reference is you don't need to repeat > the aliased expression when building more complex expressions in the same > target list. When Amazon Redshift parses this type of reference, it just > inlines the previously defined aliases. If there is a column with the same > name defined in the FROM clause as the previously aliased expression, the > column in the FROM clause takes priority. For example, in the above query if > there is a column named 'probability' in table raw_data, the 'probability' in > the second expression in the target list will refer to that column instead of > the alias name 'probability'. > {quote} > It would be nice if Spark supported this syntax. I don't think that this is > standard SQL, so it might be a good idea to research if other SQL databases > support similar syntax (and to see if they implement the same column > resolution strategy as Redshift). > We should also consider whether this needs to be feature-flagged as part of a > specific SQL compatibility mode / dialect. > One possibly-related existing ticket: SPARK-9338, which discusses the use of > SELECT aliases in GROUP BY expressions. > /cc [~hvanhovell] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41062) Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE
[ https://issues.apache.org/jira/browse/SPARK-41062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk reassigned SPARK-41062: Assignee: Haejoon Lee > Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE > --- > > Key: SPARK-41062 > URL: https://issues.apache.org/jira/browse/SPARK-41062 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > > We should use clear and brief name for every error class. > This sub-error class duplicates the main class. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41062) Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE
[ https://issues.apache.org/jira/browse/SPARK-41062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-41062. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 38576 [https://github.com/apache/spark/pull/38576] > Rename UNSUPPORTED_CORRELATED_REFERENCE to CORRELATED_REFERENCE > --- > > Key: SPARK-41062 > URL: https://issues.apache.org/jira/browse/SPARK-41062 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > Fix For: 3.4.0 > > > We should use clear and brief name for every error class. > This sub-error class duplicates the main class. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41482) Upgrade dropwizard metrics 4.2.13
[ https://issues.apache.org/jira/browse/SPARK-41482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-41482. --- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 39026 [https://github.com/apache/spark/pull/39026] > Upgrade dropwizard metrics 4.2.13 > - > > Key: SPARK-41482 > URL: https://issues.apache.org/jira/browse/SPARK-41482 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.4.0 >Reporter: Yang Jie >Assignee: Yang Jie >Priority: Minor > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41482) Upgrade dropwizard metrics 4.2.13
[ https://issues.apache.org/jira/browse/SPARK-41482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-41482: - Assignee: Yang Jie > Upgrade dropwizard metrics 4.2.13 > - > > Key: SPARK-41482 > URL: https://issues.apache.org/jira/browse/SPARK-41482 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 3.4.0 >Reporter: Yang Jie >Assignee: Yang Jie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646734#comment-17646734 ] Ohad Raviv commented on SPARK-41510: the conda solution is more for a "static" packages. the scenario is this: we're developing a python library (more than one .py file) and want to do it interactively in the notebooks. so we have all the modules in some folder and we add this folder to the driver's sys.path. Then, if we for example use a function from the module inside a UDF, we get: "ModuleNotFoundError: No module named 'some_module' ". The reason is that some_module is not in the PYTHONPATH/sys.path of the workers. the code itself is accessible to the workers for example in a shared NFS folder. so all we now need is to add the path. we can do it inside the UDF something like: ``` if "/shared_nfs/my_folder" not in sys.path: sys.path.insert(0, "/shared_nfs/my_folder") ``` but that is both very ugly and only a partial solution as it works only in UDF case. the suggestion is to have some kind of mechanism to easily add a folder to the workers' sys.path. the option of wrapping the code in zip/egg and add it makes a very long development cycle and requires restarting the spark session and the Notebook to lose its state. with the suggestion above we could actually edit the python package interactively and see the changes almost immediately. hope it is clearer now. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses
[ https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-27561: --- Assignee: Xinyi Yu > Support "lateral column alias references" to allow column aliases to be used > within SELECT clauses > -- > > Key: SPARK-27561 > URL: https://issues.apache.org/jira/browse/SPARK-27561 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Josh Rosen >Assignee: Xinyi Yu >Priority: Major > Fix For: 3.4.0 > > > Amazon Redshift has a feature called "lateral column alias references": > [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/]. > Quoting from that blogpost: > {quote}The support for lateral column alias reference enables you to write > queries without repeating the same expressions in the SELECT list. For > example, you can define the alias 'probability' and use it within the same > select statement: > {code:java} > select clicks / impressions as probability, round(100 * probability, 1) as > percentage from raw_data; > {code} > {quote} > There's more information about this feature on > [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:] > {quote}The benefit of the lateral alias reference is you don't need to repeat > the aliased expression when building more complex expressions in the same > target list. When Amazon Redshift parses this type of reference, it just > inlines the previously defined aliases. If there is a column with the same > name defined in the FROM clause as the previously aliased expression, the > column in the FROM clause takes priority. For example, in the above query if > there is a column named 'probability' in table raw_data, the 'probability' in > the second expression in the target list will refer to that column instead of > the alias name 'probability'. > {quote} > It would be nice if Spark supported this syntax. I don't think that this is > standard SQL, so it might be a good idea to research if other SQL databases > support similar syntax (and to see if they implement the same column > resolution strategy as Redshift). > We should also consider whether this needs to be feature-flagged as part of a > specific SQL compatibility mode / dialect. > One possibly-related existing ticket: SPARK-9338, which discusses the use of > SELECT aliases in GROUP BY expressions. > /cc [~hvanhovell] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27561) Support "lateral column alias references" to allow column aliases to be used within SELECT clauses
[ https://issues.apache.org/jira/browse/SPARK-27561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-27561. - Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 38776 [https://github.com/apache/spark/pull/38776] > Support "lateral column alias references" to allow column aliases to be used > within SELECT clauses > -- > > Key: SPARK-27561 > URL: https://issues.apache.org/jira/browse/SPARK-27561 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.1.0 >Reporter: Josh Rosen >Priority: Major > Fix For: 3.4.0 > > > Amazon Redshift has a feature called "lateral column alias references": > [https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/]. > Quoting from that blogpost: > {quote}The support for lateral column alias reference enables you to write > queries without repeating the same expressions in the SELECT list. For > example, you can define the alias 'probability' and use it within the same > select statement: > {code:java} > select clicks / impressions as probability, round(100 * probability, 1) as > percentage from raw_data; > {code} > {quote} > There's more information about this feature on > [https://docs.aws.amazon.com/redshift/latest/dg/r_SELECT_list.html:] > {quote}The benefit of the lateral alias reference is you don't need to repeat > the aliased expression when building more complex expressions in the same > target list. When Amazon Redshift parses this type of reference, it just > inlines the previously defined aliases. If there is a column with the same > name defined in the FROM clause as the previously aliased expression, the > column in the FROM clause takes priority. For example, in the above query if > there is a column named 'probability' in table raw_data, the 'probability' in > the second expression in the target list will refer to that column instead of > the alias name 'probability'. > {quote} > It would be nice if Spark supported this syntax. I don't think that this is > standard SQL, so it might be a good idea to research if other SQL databases > support similar syntax (and to see if they implement the same column > resolution strategy as Redshift). > We should also consider whether this needs to be feature-flagged as part of a > specific SQL compatibility mode / dialect. > One possibly-related existing ticket: SPARK-9338, which discusses the use of > SELECT aliases in GROUP BY expressions. > /cc [~hvanhovell] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39601) AllocationFailure should not be treated as exitCausedByApp when driver is shutting down
[ https://issues.apache.org/jira/browse/SPARK-39601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646682#comment-17646682 ] Apache Spark commented on SPARK-39601: -- User 'pan3793' has created a pull request for this issue: https://github.com/apache/spark/pull/39053 > AllocationFailure should not be treated as exitCausedByApp when driver is > shutting down > --- > > Key: SPARK-39601 > URL: https://issues.apache.org/jira/browse/SPARK-39601 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 3.3.0 >Reporter: Cheng Pan >Assignee: Cheng Pan >Priority: Major > Fix For: 3.4.0 > > > I observed some Spark Applications successfully completed all jobs but failed > during the shutting down phase w/ reason: Max number of executor failures > (16) reached, the timeline is > Driver - Job success, Spark starts shutting down procedure. > {code:java} > 2022-06-23 19:50:55 CST AbstractConnector INFO - Stopped > Spark@74e9431b{HTTP/1.1, (http/1.1)} > {0.0.0.0:0} > 2022-06-23 19:50:55 CST SparkUI INFO - Stopped Spark web UI at > http://hadoop2627.xxx.org:28446 > 2022-06-23 19:50:55 CST YarnClusterSchedulerBackend INFO - Shutting down all > executors > {code} > Driver - A container allocate successful during shutting down phase. > {code:java} > 2022-06-23 19:52:21 CST YarnAllocator INFO - Launching container > container_e94_1649986670278_7743380_02_25 on host hadoop4388.xxx.org for > executor with ID 24 for ResourceProfile Id 0{code} > Executor - The executor can not connect to driver endpoint because driver > already stopped the endpoint. > {code:java} > Exception in thread "main" java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393) > at > org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:81) > at > org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala) > Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:413) > at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) > at scala.collection.immutable.Range.foreach(Range.scala:158) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:411) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) > ... 4 more > Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find > endpoint: spark://coarsegrainedschedu...@hadoop2627.xxx.org:21956 > at > org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148) > at > org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144) > at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307) > at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99) > at > scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288){code} > Driver - YarnAllocator received container launch error message and treat it > as
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646658#comment-17646658 ] huangtengfei commented on SPARK-41497: -- I also think that option3/4(include the improved proposal [~mridulm80] suggested) would be promising, the issue could be resolved either from the accumulator side or rdd cache side. And option4 seems more straightforward since it's a complement to existing cache mechanism. And making decision based on task status could be a feasible solution. As mentioned above, the downside is that it may be overkill. If such cases are small probability events, maybe it is also acceptable. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail:
[jira] [Resolved] (SPARK-39601) AllocationFailure should not be treated as exitCausedByApp when driver is shutting down
[ https://issues.apache.org/jira/browse/SPARK-39601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Graves resolved SPARK-39601. --- Fix Version/s: 3.4.0 Assignee: Cheng Pan Resolution: Fixed > AllocationFailure should not be treated as exitCausedByApp when driver is > shutting down > --- > > Key: SPARK-39601 > URL: https://issues.apache.org/jira/browse/SPARK-39601 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 3.3.0 >Reporter: Cheng Pan >Assignee: Cheng Pan >Priority: Major > Fix For: 3.4.0 > > > I observed some Spark Applications successfully completed all jobs but failed > during the shutting down phase w/ reason: Max number of executor failures > (16) reached, the timeline is > Driver - Job success, Spark starts shutting down procedure. > {code:java} > 2022-06-23 19:50:55 CST AbstractConnector INFO - Stopped > Spark@74e9431b{HTTP/1.1, (http/1.1)} > {0.0.0.0:0} > 2022-06-23 19:50:55 CST SparkUI INFO - Stopped Spark web UI at > http://hadoop2627.xxx.org:28446 > 2022-06-23 19:50:55 CST YarnClusterSchedulerBackend INFO - Shutting down all > executors > {code} > Driver - A container allocate successful during shutting down phase. > {code:java} > 2022-06-23 19:52:21 CST YarnAllocator INFO - Launching container > container_e94_1649986670278_7743380_02_25 on host hadoop4388.xxx.org for > executor with ID 24 for ResourceProfile Id 0{code} > Executor - The executor can not connect to driver endpoint because driver > already stopped the endpoint. > {code:java} > Exception in thread "main" java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393) > at > org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:81) > at > org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala) > Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:413) > at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) > at scala.collection.immutable.Range.foreach(Range.scala:158) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:411) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) > ... 4 more > Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find > endpoint: spark://coarsegrainedschedu...@hadoop2627.xxx.org:21956 > at > org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148) > at > org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144) > at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307) > at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99) > at > scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288){code} > Driver - YarnAllocator received container launch error message and treat it > as `exitCausedByApp` > {code:java} > 2022-06-23 19:52:27 CST YarnAllocator
[jira] [Resolved] (SPARK-41478) Assign a name to the error class _LEGACY_ERROR_TEMP_1234
[ https://issues.apache.org/jira/browse/SPARK-41478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-41478. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 39018 [https://github.com/apache/spark/pull/39018] > Assign a name to the error class _LEGACY_ERROR_TEMP_1234 > > > Key: SPARK-41478 > URL: https://issues.apache.org/jira/browse/SPARK-41478 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: BingKun Pan >Assignee: BingKun Pan >Priority: Minor > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41478) Assign a name to the error class _LEGACY_ERROR_TEMP_1234
[ https://issues.apache.org/jira/browse/SPARK-41478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk reassigned SPARK-41478: Assignee: BingKun Pan > Assign a name to the error class _LEGACY_ERROR_TEMP_1234 > > > Key: SPARK-41478 > URL: https://issues.apache.org/jira/browse/SPARK-41478 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: BingKun Pan >Assignee: BingKun Pan >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646639#comment-17646639 ] wuyi commented on SPARK-41497: -- [~mridulm80] Sounds like a better idea than option 4. But I think it still doesn't work well for the case like: For example, a task is constructed by `rdd1.cache().rdd2`. So if the task fails due to rdd2's computation, I think rdd1's cache should still be able to reuse. > Accumulator undercounting in the case of retry task with rdd cache > -- > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache
[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wuyi updated SPARK-41497: - Description: Accumulator could be undercounted when the retried task has rdd cache. See the example below and you could also find the completed and reproducible example at [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] {code:scala} test("SPARK-XXX") { // Set up a cluster with 2 executors val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") sc = new SparkContext(conf) // Set up a custom task scheduler. The scheduler will fail the first task attempt of the job // submitted below. In particular, the failed first attempt task would success on computation // (accumulator accounting, result caching) but only fail to report its success status due // to the concurrent executor lost. The second task attempt would success. taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) val myAcc = sc.longAccumulator("myAcc") // Initiate a rdd with only one partition so there's only one task and specify the storage level // with MEMORY_ONLY_2 so that the rdd result will be cached on both two executors. val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => myAcc.add(100) iter.map(x => x + 1) }.persist(StorageLevel.MEMORY_ONLY_2) // This will pass since the second task attempt will succeed assert(rdd.count() === 10) // This will fail due to `myAcc.add(100)` won't be executed during the second task attempt's // execution. Because the second task attempt will load the rdd cache directly instead of // executing the task function so `myAcc.add(100)` is skipped. assert(myAcc.value === 100) } {code} We could also hit this issue with decommission even if the rdd only has one copy. For example, decommission could migrate the rdd cache block to another executor (the result is actually the same with 2 copies) and the decommissioned executor lost before the task reports its success status to the driver. And the issue is a bit more complicated than expected to fix. I have tried to give some fixes but all of them are not ideal: Option 1: Clean up any rdd cache related to the failed task: in practice, this option can already fix the issue in most cases. However, theoretically, rdd cache could be reported to the driver right after the driver cleans up the failed task's caches due to asynchronous communication. So this option can’t resolve the issue thoroughly; Option 2: Disallow rdd cache reuse across the task attempts for the same task: this option can 100% fix the issue. The problem is this way can also affect the case where rdd cache can be reused across the attempts (e.g., when there is no accumulator operation in the task), which can have perf regression; Option 3: Introduce accumulator cache: first, this requires a new framework for supporting accumulator cache; second, the driver should improve its logic to distinguish whether the accumulator cache value should be reported to the user to avoid overcounting. For example, in the case of task retry, the value should be reported. However, in the case of rdd cache reuse, the value shouldn’t be reported (should it?); Option 4: Do task success validation when a task trying to load the rdd cache: this way defines a rdd cache is only valid/accessible if the task has succeeded. This way could be either overkill or a bit complex (because currently Spark would clean up the task state once it’s finished. So we need to maintain a structure to know if task once succeeded or not. ) was: Accumulator could be undercounted when the retried task has rdd cache. See the example below and you could also find the completed and reproducible example at [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] {code:scala} test("SPARK-XXX") { // Set up a cluster with 2 executors val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") sc = new SparkContext(conf) // Set up a custom task scheduler. The scheduler will fail the first task attempt of the job // submitted below. In particular, the failed first attempt task would success on computation // (accumulator accounting, result caching) but only fail to report its success status due // to the concurrent executor lost. The second task attempt would success. taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) val myAcc = sc.longAccumulator("myAcc") // Initiate a rdd with only one partition so there's only one task and specify the storage level // with MEMORY_ONLY_2 so that the rdd result will be cached on both two executors. val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => myAcc.add(100) iter.map(x => x + 1) }.persist(StorageLevel.MEMORY_ONLY_2) // This will
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646620#comment-17646620 ] Hyukjin Kwon commented on SPARK-41510: -- What about using Conda ([https://www.databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html)] or adding python files via --py-files? Would be great to elabourate the usage. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646606#comment-17646606 ] Ohad Raviv edited comment on SPARK-41510 at 12/13/22 12:23 PM: --- [~hvanhovell], [~gurwls223] - can you please look at this/refer that to someone? was (Author: uzadude): [~hvanhovell] - can you please refer that to someone? > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646606#comment-17646606 ] Ohad Raviv commented on SPARK-41510: [~hvanhovell] - can you please refer that to someone? > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost
[ https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646605#comment-17646605 ] Apache Spark commented on SPARK-41360: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/39052 > Avoid BlockManager re-registration if the executor has been lost > > > Key: SPARK-41360 > URL: https://issues.apache.org/jira/browse/SPARK-41360 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Assignee: wuyi >Priority: Major > Fix For: 3.2.4, 3.3.2, 3.4.0 > > > We should avoid block manager re-registration if the executor has been lost > as it's meaningless and harmful, e.g., SPARK-35011 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost
[ https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646603#comment-17646603 ] Apache Spark commented on SPARK-41360: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/39052 > Avoid BlockManager re-registration if the executor has been lost > > > Key: SPARK-41360 > URL: https://issues.apache.org/jira/browse/SPARK-41360 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 >Reporter: wuyi >Assignee: wuyi >Priority: Major > Fix For: 3.2.4, 3.3.2, 3.4.0 > > > We should avoid block manager re-registration if the executor has been lost > as it's meaningless and harmful, e.g., SPARK-35011 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey
[ https://issues.apache.org/jira/browse/SPARK-41511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41511: Assignee: (was: Apache Spark) > LongToUnsafeRowMap support ignoresDuplicatedKey > --- > > Key: SPARK-41511 > URL: https://issues.apache.org/jira/browse/SPARK-41511 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For left semi and left anti hash join, the duplicated keys of build side have > no meaning. > Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We > can also optimize LongHashedRelation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
[ https://issues.apache.org/jira/browse/SPARK-41510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ohad Raviv updated SPARK-41510: --- Description: When working interactively with Spark through notebooks in various envs - Databricks/YARN I often encounter a very frustrating process of trying to add new python modules and even change their code without starting a new spark session/cluster. In the driver side it is easy to add things like `sys.path.append()` but if for example, if a UDF code is importing a function from a local module, then the pickle boundaries will assume that the module exists in the workers, and fail on "python module does not exist..". To update the code "online" I can add NFS volume to the workers' PYTHONPATH. However, setting the PYTHONPATH in the workers is not easy as it gets overridden by someone (databricks/spark) along the way. a few ugly workarounds are suggested like running a "dummy" UDF on the workers to add the folder to the sys.path. I think all of that could easily be solved if we just add a dedicated `spark.conf` the will get merged into the worker's PYTHONPATH, just here: [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] please tell me what you think, and I will make the PR. thanks. was: When working interactively with Spark through notebooks in various envs - Databricks/YARN I often encounter a very frustrating process of trying to add new python modules and even change their code without starting a new spark session/cluster. in the driver side it is easy to add things like `sys.path.append()` but if for example UDF code is importing function from a local module, then the pickle boundaries will assume that the module exists in the workers. and then I fail on "python module does not exist..". adding NFS volumes to the workers PYTHONPATH could solve it, but it requires restarting the session/cluster and worse doesn't work in all envs as the PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few ugly work around are suggested like running a "dummy" udf on workers to add the folder to the sys.path. I think all of that could easily be solved if we add a spark.conf to add to the worker PYTHONPATH. here: [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] please tell me what you think, and I will make the PR. thanks. > Support easy way for user defined PYTHONPATH in workers > --- > > Key: SPARK-41510 > URL: https://issues.apache.org/jira/browse/SPARK-41510 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.1 >Reporter: Ohad Raviv >Priority: Minor > > When working interactively with Spark through notebooks in various envs - > Databricks/YARN I often encounter a very frustrating process of trying to add > new python modules and even change their code without starting a new spark > session/cluster. > In the driver side it is easy to add things like `sys.path.append()` but if > for example, if a UDF code is importing a function from a local module, then > the pickle boundaries will assume that the module exists in the workers, and > fail on "python module does not exist..". > To update the code "online" I can add NFS volume to the workers' PYTHONPATH. > However, setting the PYTHONPATH in the workers is not easy as it gets > overridden by someone (databricks/spark) along the way. a few ugly > workarounds are suggested like running a "dummy" UDF on the workers to add > the folder to the sys.path. > I think all of that could easily be solved if we just add a dedicated > `spark.conf` the will get merged into the worker's PYTHONPATH, just here: > [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] > > please tell me what you think, and I will make the PR. > thanks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey
[ https://issues.apache.org/jira/browse/SPARK-41511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646600#comment-17646600 ] Apache Spark commented on SPARK-41511: -- User 'ulysses-you' has created a pull request for this issue: https://github.com/apache/spark/pull/39051 > LongToUnsafeRowMap support ignoresDuplicatedKey > --- > > Key: SPARK-41511 > URL: https://issues.apache.org/jira/browse/SPARK-41511 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Priority: Major > > For left semi and left anti hash join, the duplicated keys of build side have > no meaning. > Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We > can also optimize LongHashedRelation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey
[ https://issues.apache.org/jira/browse/SPARK-41511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41511: Assignee: Apache Spark > LongToUnsafeRowMap support ignoresDuplicatedKey > --- > > Key: SPARK-41511 > URL: https://issues.apache.org/jira/browse/SPARK-41511 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: XiDuo You >Assignee: Apache Spark >Priority: Major > > For left semi and left anti hash join, the duplicated keys of build side have > no meaning. > Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We > can also optimize LongHashedRelation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41510) Support easy way for user defined PYTHONPATH in workers
Ohad Raviv created SPARK-41510: -- Summary: Support easy way for user defined PYTHONPATH in workers Key: SPARK-41510 URL: https://issues.apache.org/jira/browse/SPARK-41510 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.1 Reporter: Ohad Raviv When working interactively with Spark through notebooks in various envs - Databricks/YARN I often encounter a very frustrating process of trying to add new python modules and even change their code without starting a new spark session/cluster. in the driver side it is easy to add things like `sys.path.append()` but if for example UDF code is importing function from a local module, then the pickle boundaries will assume that the module exists in the workers. and then I fail on "python module does not exist..". adding NFS volumes to the workers PYTHONPATH could solve it, but it requires restarting the session/cluster and worse doesn't work in all envs as the PYTHONPATH gets overridden by someone (databricks/spark) along the way. a few ugly work around are suggested like running a "dummy" udf on workers to add the folder to the sys.path. I think all of that could easily be solved if we add a spark.conf to add to the worker PYTHONPATH. here: [https://github.com/apache/spark/blob/0e2d604fd33c8236cfa8ae243eeaec42d3176a06/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L94] please tell me what you think, and I will make the PR. thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41511) LongToUnsafeRowMap support ignoresDuplicatedKey
XiDuo You created SPARK-41511: - Summary: LongToUnsafeRowMap support ignoresDuplicatedKey Key: SPARK-41511 URL: https://issues.apache.org/jira/browse/SPARK-41511 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: XiDuo You For left semi and left anti hash join, the duplicated keys of build side have no meaning. Previous, we supported ingore duplicated keys for UnsafeHashedRelation. We can also optimize LongHashedRelation. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41412) Implement `Cast`
[ https://issues.apache.org/jira/browse/SPARK-41412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646594#comment-17646594 ] Apache Spark commented on SPARK-41412: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/39050 > Implement `Cast` > > > Key: SPARK-41412 > URL: https://issues.apache.org/jira/browse/SPARK-41412 > Project: Spark > Issue Type: Sub-task > Components: Connect >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41412) Implement `Cast`
[ https://issues.apache.org/jira/browse/SPARK-41412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646593#comment-17646593 ] Apache Spark commented on SPARK-41412: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/39050 > Implement `Cast` > > > Key: SPARK-41412 > URL: https://issues.apache.org/jira/browse/SPARK-41412 > Project: Spark > Issue Type: Sub-task > Components: Connect >Affects Versions: 3.4.0 >Reporter: Rui Wang >Assignee: Rui Wang >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.
[ https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiaan.geng updated SPARK-41509: --- Description: Currently, Spark runtime filter supports bloom filter and in subquery filter. The in subquery filter always execute Murmur3Hash before aggregate the join key. Because the data size before aggregate will lager than after, we can delay execute Murmur3Hash until after aggregation for semi-join runtime filter and it will reduce the number of calls to Murmur3Hash and improve performance. > Delay execution hash until after aggregation for semi-join runtime filter. > -- > > Key: SPARK-41509 > URL: https://issues.apache.org/jira/browse/SPARK-41509 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: jiaan.geng >Priority: Major > > Currently, Spark runtime filter supports bloom filter and in subquery filter. > The in subquery filter always execute Murmur3Hash before aggregate the join > key. > Because the data size before aggregate will lager than after, we can delay > execute Murmur3Hash until after aggregation for semi-join runtime filter and > it will reduce the number of calls to Murmur3Hash and improve performance. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.
[ https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646581#comment-17646581 ] Apache Spark commented on SPARK-41509: -- User 'beliefer' has created a pull request for this issue: https://github.com/apache/spark/pull/39049 > Delay execution hash until after aggregation for semi-join runtime filter. > -- > > Key: SPARK-41509 > URL: https://issues.apache.org/jira/browse/SPARK-41509 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: jiaan.geng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.
[ https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41509: Assignee: (was: Apache Spark) > Delay execution hash until after aggregation for semi-join runtime filter. > -- > > Key: SPARK-41509 > URL: https://issues.apache.org/jira/browse/SPARK-41509 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: jiaan.geng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.
[ https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41509: Assignee: Apache Spark > Delay execution hash until after aggregation for semi-join runtime filter. > -- > > Key: SPARK-41509 > URL: https://issues.apache.org/jira/browse/SPARK-41509 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: jiaan.geng >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.
[ https://issues.apache.org/jira/browse/SPARK-41509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646579#comment-17646579 ] Apache Spark commented on SPARK-41509: -- User 'beliefer' has created a pull request for this issue: https://github.com/apache/spark/pull/39049 > Delay execution hash until after aggregation for semi-join runtime filter. > -- > > Key: SPARK-41509 > URL: https://issues.apache.org/jira/browse/SPARK-41509 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: jiaan.geng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41509) Delay execution hash until after aggregation for semi-join runtime filter.
jiaan.geng created SPARK-41509: -- Summary: Delay execution hash until after aggregation for semi-join runtime filter. Key: SPARK-41509 URL: https://issues.apache.org/jira/browse/SPARK-41509 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: jiaan.geng -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41319) when-otherwise support
[ https://issues.apache.org/jira/browse/SPARK-41319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646575#comment-17646575 ] Apache Spark commented on SPARK-41319: -- User 'zhengruifeng' has created a pull request for this issue: https://github.com/apache/spark/pull/38956 > when-otherwise support > -- > > Key: SPARK-41319 > URL: https://issues.apache.org/jira/browse/SPARK-41319 > Project: Spark > Issue Type: Sub-task > Components: Connect >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Priority: Major > > 1, add protobuf message for expression 'CaseWhen'; > 2, support the 'Column.\{when, otherwise\}' methods in Spark Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41319) when-otherwise support
[ https://issues.apache.org/jira/browse/SPARK-41319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646574#comment-17646574 ] Apache Spark commented on SPARK-41319: -- User 'zhengruifeng' has created a pull request for this issue: https://github.com/apache/spark/pull/38956 > when-otherwise support > -- > > Key: SPARK-41319 > URL: https://issues.apache.org/jira/browse/SPARK-41319 > Project: Spark > Issue Type: Sub-task > Components: Connect >Affects Versions: 3.4.0 >Reporter: Ruifeng Zheng >Priority: Major > > 1, add protobuf message for expression 'CaseWhen'; > 2, support the 'Column.\{when, otherwise\}' methods in Spark Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41508) assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable
Yang Jie created SPARK-41508: Summary: assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable Key: SPARK-41508 URL: https://issues.apache.org/jira/browse/SPARK-41508 Project: Spark Issue Type: Sub-task Components: Spark Core, SQL Affects Versions: 3.4.0 Reporter: Yang Jie -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41508) Assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable
[ https://issues.apache.org/jira/browse/SPARK-41508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Jie updated SPARK-41508: - Summary: Assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable (was: assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable) > Assign name to _LEGACY_ERROR_TEMP_1179 and unwrap the existing SparkThrowable > -- > > Key: SPARK-41508 > URL: https://issues.apache.org/jira/browse/SPARK-41508 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, SQL >Affects Versions: 3.4.0 >Reporter: Yang Jie >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41406) Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
[ https://issues.apache.org/jira/browse/SPARK-41406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk reassigned SPARK-41406: Assignee: BingKun Pan > Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic > - > > Key: SPARK-41406 > URL: https://issues.apache.org/jira/browse/SPARK-41406 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: BingKun Pan >Assignee: BingKun Pan >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41406) Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
[ https://issues.apache.org/jira/browse/SPARK-41406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-41406. -- Fix Version/s: 3.4.0 Resolution: Fixed Issue resolved by pull request 38937 [https://github.com/apache/spark/pull/38937] > Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic > - > > Key: SPARK-41406 > URL: https://issues.apache.org/jira/browse/SPARK-41406 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0 >Reporter: BingKun Pan >Assignee: BingKun Pan >Priority: Minor > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41507) Correct group of collection_funcs
[ https://issues.apache.org/jira/browse/SPARK-41507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646509#comment-17646509 ] Apache Spark commented on SPARK-41507: -- User 'LuciferYang' has created a pull request for this issue: https://github.com/apache/spark/pull/39045 > Correct group of collection_funcs > - > > Key: SPARK-41507 > URL: https://issues.apache.org/jira/browse/SPARK-41507 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Yang Jie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41424) Protobuf serializer for TaskDataWrapper
[ https://issues.apache.org/jira/browse/SPARK-41424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646508#comment-17646508 ] Apache Spark commented on SPARK-41424: -- User 'gengliangwang' has created a pull request for this issue: https://github.com/apache/spark/pull/39048 > Protobuf serializer for TaskDataWrapper > --- > > Key: SPARK-41424 > URL: https://issues.apache.org/jira/browse/SPARK-41424 > Project: Spark > Issue Type: Sub-task > Components: Web UI >Affects Versions: 3.4.0 >Reporter: Gengliang Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-41507) Correct group of collection_funcs
[ https://issues.apache.org/jira/browse/SPARK-41507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-41507: Assignee: (was: Apache Spark) > Correct group of collection_funcs > - > > Key: SPARK-41507 > URL: https://issues.apache.org/jira/browse/SPARK-41507 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Yang Jie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org