[GitHub] [spark] sadikovi commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference
sadikovi commented on code in PR #37933: URL: https://github.com/apache/spark/pull/37933#discussion_r974908544 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ## @@ -224,7 +223,7 @@ class UnivocityParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility if enabled. -if (!enableParsingFallbackForTimestampType) { +if (!enableParsingFallbackForDateType) { Review Comment: @xiaonanyang-db Does it mean that we don't have test coverage for it? I am wondering how I managed to merge a PR like this without a unit test 樂. I will open a follow-up PR to improve testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xiaonanyang-db commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference
xiaonanyang-db commented on code in PR #37933: URL: https://github.com/apache/spark/pull/37933#discussion_r974909141 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ## @@ -224,7 +223,7 @@ class UnivocityParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility if enabled. -if (!enableParsingFallbackForTimestampType) { +if (!enableParsingFallbackForDateType) { Review Comment: That will be nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference
sadikovi commented on code in PR #37933: URL: https://github.com/apache/spark/pull/37933#discussion_r974908544 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ## @@ -224,7 +223,7 @@ class UnivocityParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility if enabled. -if (!enableParsingFallbackForTimestampType) { +if (!enableParsingFallbackForDateType) { Review Comment: @xiaonanyang-db Does it mean that we don't have test coverage for it? I am wondering how I managed to merge like this. I will open a follow-up PR to improve testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xiaonanyang-db commented on pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference
xiaonanyang-db commented on PR #37933: URL: https://github.com/apache/spark/pull/37933#issuecomment-1251869330 > Can you update the description to list all of the semantics of the change? You can remove the point where we need to merge them to TimestampType if this is not what the PR implements and replace it with "merging to StringType" instead. > > Is it correct that the date inference is still controlled by "prefersDate"? Sure! Yes, it's still controlled by "prefersDate". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #37940: [SPARK-40494][CORE][SQL][MLLIB] Optimize the performance of `keys.zipWithIndex.toMap` code pattern
LuciferYang commented on PR #37940: URL: https://github.com/apache/spark/pull/37940#issuecomment-1251869079 Test the following code with input size `1,5,10,20,50,100,150,200,300,400,500,1000,5000,1,2` ``` def testZipWithIndexToMap(valuesPerIteration: Int, collectionSize: Int): Unit = { val benchmark = new Benchmark( s"Test zip with index to map with collectionSize = $collectionSize", valuesPerIteration, output = output) val data = 0 until collectionSize benchmark.addCase("Use zipWithIndex + toMap") { _: Int => for (_ <- 0L until valuesPerIteration) { val map: Map[Int, Int] = data.zipWithIndex.toMap } } benchmark.addCase("Use zipWithIndex + collection.breakOut") { _: Int => for (_ <- 0L until valuesPerIteration) { val map: Map[Int, Int] = data.zipWithIndex(collection.breakOut[IndexedSeq[Int], (Int, Int), Map[Int, Int]]) } } benchmark.addCase("Use Manual builder") { _: Int => for (_ <- 0L until valuesPerIteration) { val map: Map[Int, Int] = zipToMapUseMapBuilder[Int](data) } } benchmark.addCase("Use Manual map") { _: Int => for (_ <- 0L until valuesPerIteration) { val map: Map[Int, Int] = zipWithIndexToMapUseMap[Int](data) } } benchmark.run() } private def zipToMapUseMapBuilder[K](keys: Iterable[K]): Map[K, Int] = { import scala.collection.immutable val builder = immutable.Map.newBuilder[K, Int] val keyIter = keys.iterator var idx = 0 while (keyIter.hasNext) { builder += (keyIter.next(), idx).asInstanceOf[(K, Int)] idx = idx + 1 } builder.result() } private def zipWithIndexToMapUseMap[K](keys: Iterable[K]): Map[K, Int] = { var elems: Map[K, Int] = Map.empty[K, Int] val keyIter = keys.iterator var idx = 0 while (keyIter.hasNext) { elems += (keyIter.next().asInstanceOf[K] -> idx) idx = idx + 1 } elems } ``` result as follows: **Java 8** ``` OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz Test zip with index to map with collectionSize = 1: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- Use zipWithIndex + toMap 41 43 3 2.5 406.8 1.0X Use zipWithIndex + collection.breakOut 4 4 0 23.6 42.4 9.6X Use Manual builder 4 4 0 27.8 35.9 11.3X Use Manual map 3 3 0 37.4 26.8 15.2X OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz Test zip with index to map with collectionSize = 5: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- Use zipWithIndex + toMap 142 143 2 0.71421.2 1.0X Use zipWithIndex + collection.breakOut101 102 1 1.01011.0 1.4X Use Manual builder 99 101 2 1.0 994.0 1.4X Use Manual map 49 49 1 2.1 485.6 2.9X OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz Test zip with index to map with collectionSize = 10: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- Use zipWithIndex + toMap 166 170 5 0.61660.0 1.0X Use zipWithIndex + collection.breakOut 123 128 5 0.81226.3 1.4X Use Manual builder 121 123 3 0.81207.9 1.4X Use Manual map 102 104 3
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974906420 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -311,6 +323,56 @@ object UnsupportedOperationChecker extends Logging { } } +// applyInPandasWithState Review Comment: NOTE: this is just a copy and paste of flatMapGroupsWithState since the characteristics are same for both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference
sadikovi commented on code in PR #37933: URL: https://github.com/apache/spark/pull/37933#discussion_r974905472 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala: ## @@ -224,7 +223,7 @@ class UnivocityParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility if enabled. -if (!enableParsingFallbackForTimestampType) { +if (!enableParsingFallbackForDateType) { Review Comment: Thanks for fixing it, the values were swapped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974905452 ## python/pyspark/worker.py: ## @@ -207,6 +209,65 @@ def wrapped(key_series, value_series): return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))] +def wrap_grouped_map_pandas_udf_with_state(f, return_type): +def wrapped(key_series, value_series_gen, state): +import pandas as pd + +key = tuple(s[0] for s in key_series) + +if state.hasTimedOut: +# Timeout processing pass empty iterator. Here we return an empty DataFrame instead. +values = [ +pd.DataFrame(columns=pd.concat(next(value_series_gen), axis=1).columns), +] +else: +values = (pd.concat(x, axis=1) for x in value_series_gen) + +result_iter = f(key, values, state) + +def verify_element(result): +if not isinstance(result, pd.DataFrame): +raise TypeError( +"The type of element in return iterator of the user-defined function " +"should be pandas.DataFrame, but is {}".format(type(result)) +) +# the number of columns of result have to match the return type +# but it is fine for result to have no columns at all if it is empty +if not ( Review Comment: This is borrowed from above function - I think we took `if not` here because it's more intuitive and easier to think of "valid" case and apply "not" to reverse, rather than manually convert the conditions to be the contraposition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #37729: Revert "[SPARK-33861][SQL] Simplify conditional in predicate"
dongjoon-hyun commented on PR #37729: URL: https://github.com/apache/spark/pull/37729#issuecomment-1251865262 Thank you, @wangyum . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #37940: [SPARK-40494][CORE][SQL][MLLIB] Optimize the performance of `keys.zipWithIndex.toMap` code pattern
LuciferYang opened a new pull request, #37940: URL: https://github.com/apache/spark/pull/37940 ### What changes were proposed in this pull request? Similar as https://github.com/apache/spark/pull/37876, this pr introduce a new toMap method to `o.a.spark.util.collection.Utils`, use `while loop manually style` to optimize the performance of `keys.zipWithIndex.toMap` code pattern in Spark. ### Why are the changes needed? Performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #37729: Revert "[SPARK-33861][SQL] Simplify conditional in predicate"
wangyum commented on PR #37729: URL: https://github.com/apache/spark/pull/37729#issuecomment-1251857340 OK. https://issues.apache.org/jira/browse/SPARK-40493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #37937: [SPARK-40491][SQL] Expose a jdbcRDD function in SparkContext
beliefer commented on PR #37937: URL: https://github.com/apache/spark/pull/37937#issuecomment-1251853764 > Hm, why do we need this? Can't we do `spark.read.jdbc(...).rdd` or `toDS`? I know. This PR just follows the legacy document of `JdbcRDD`. If we don't need the change, we may need update the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
dongjoon-hyun commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974893716 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2221,6 +2221,14 @@ package object config { .checkValue(_ >= 0, "needs to be a non-negative value") .createWithDefault(5) + private[spark] val STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE = +ConfigBuilder("spark.stage.ignoreOnDecommissionFetchFailure") Review Comment: Shall we remove the redundant `On`? It would be better because the variable name already have no `On`. ``` - ignoreOnDecommissionFetchFailure + ignoreDecommissionFetchFailure ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
dongjoon-hyun commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974893716 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2221,6 +2221,14 @@ package object config { .checkValue(_ >= 0, "needs to be a non-negative value") .createWithDefault(5) + private[spark] val STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE = +ConfigBuilder("spark.stage.ignoreOnDecommissionFetchFailure") Review Comment: Shall we remove the redundant `On`? ``` - ignoreOnDecommissionFetchFailure + ignoreDecommissionFetchFailure ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
dongjoon-hyun commented on code in PR #37934: URL: https://github.com/apache/spark/pull/37934#discussion_r974891717 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala: ## @@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession vectorizedReader.initBatch(schema, partitionValues) vectorizedReader.nextKeyValue() val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] - - // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` - // in order to use get(...) method which is not implemented in `ColumnarBatch`. - val actual = row.copy().get(1, dt) Review Comment: Well, if this is a specifically required change, please do this change in a separate PR, @kazuyukitanimura . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37679: [SPARK-35242][SQL] Support changing session catalog's default database
cloud-fan commented on code in PR #37679: URL: https://github.com/apache/spark/pull/37679#discussion_r974888079 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala: ## @@ -48,9 +48,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.Utils -object SessionCatalog { - val DEFAULT_DATABASE = "default" Review Comment: can we still keep it so that we don't need to hardcode "default"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36096: [SPARK-38803][K8S][TESTS] Lower minio cpu to 250m (0.25) from 1 in K8s IT
dongjoon-hyun commented on PR #36096: URL: https://github.com/apache/spark/pull/36096#issuecomment-1251844564 This test commit is backported to branch-3.3 according to the community request, https://github.com/apache/spark/pull/36087#issuecomment-1251757187 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`
dongjoon-hyun commented on PR #36087: URL: https://github.com/apache/spark/pull/36087#issuecomment-1251843627 Sure, @Yikun . This test commit is backported to branch-3.3 according to the community request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunpe commented on pull request #33154: [SPARK-35949][CORE]Add `keep-spark-context-alive` arg for to prevent closing spark context after invoking main for some case
sunpe commented on PR #33154: URL: https://github.com/apache/spark/pull/33154#issuecomment-1251836832 > Hello @sunpe, thank you for your very fast answer. > > Please let me give you some more context, I am using Spark v3.3.0 in K8s using [Spark on K8S operator](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator). My Spark driver is a Spring Boot application, very similar to the situation you described above, but instead of starting a web server, I subscribe to a RabbitMQ queue. Basically, I would like to have the `SparkSession` available as `@Bean` in my services. > > I tried the following: > > 1. Activate `--verbose` > 2. Rebuild from source (v3.3.0), adding a simple `logWarning` before ` SparkContext.getActive.foreach(_.stop())` > > Here are my findings: > > Spark Operator pod submits the application using this command: > > ``` > submission.go:65] spark-submit arguments: [/opt/spark/bin/spark-submit > --class org.test.myapplication.MyApplication > --master k8s://https://10.32.0.1:443 > --deploy-mode cluster > --conf spark.kubernetes.namespace=my-app-namespace > --conf spark.app.name=my-application > --conf spark.kubernetes.driver.pod.name=my-application-driver > --conf spark.kubernetes.container.image=repo/my-application:latest > --conf spark.kubernetes.container.image.pullPolicy=Always > --conf spark.kubernetes.submission.waitAppCompletion=false > --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=my-application > --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true > --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=2b225916-cc12-47cc-a898-8549301fdce4 > --conf spark.driver.cores=1 > --conf spark.kubernetes.driver.request.cores=200m > --conf spark.driver.memory=512m > --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-operator-spark > --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=my-application > --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true > --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=2b225916-cc12-47cc-a898-8549301fdce4 > --conf spark.executor.instances=1 > --conf spark.executor.cores=1 > --conf spark.executor.memory=512m > --conf spark.kubernetes.executor.label.app.kubernetes.io/instance=my-app-namespace-my-application > local:///opt/spark/work-dir/my-application-0.0.1-SNAPSHOT-all.jar] > ``` > > When Driver pod starts, I have the following logs: > > ``` > (spark.driver.memory,512m) > (spark.driver.port,7078) > (spark.executor.cores,1) > (spark.executor.instances,1) > (spark.executor.memory,512m) > […] > (spark.kubernetes.resource.type,java) > (spark.kubernetes.submission.waitAppCompletion,false) > (spark.kubernetes.submitInDriver,true) > (spark.master,k8s://https://10.32.0.1:443) > ``` > > As you can see, `args.master` starts by `k8s`. Once the application is started and `main()` thread is release, my custom log is printed, `SparkContext` is being closed and the executor is stopped. As I understand in source code, my primary resource is not `spark-shell`, neither the main class is ` org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver` nor ` org.apache.spark.sql.hive.thriftserver.HiveThriftServer2`. It produces the following logs: > > ``` > 2022-09-19 13:05:18.621 INFO 30 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#1734b1a:0/SimpleConnection@66859ea9 [delegate=amqp://user@100.64.2.141:5672/, localPort= 49038] > 2022-09-19 13:05:18.850 INFO 30 --- [ main] MyApplication : Started MyApplication in 18.787 seconds (JVM running for 23.051) > Warning: SparkContext is going to be stopped! > 2022-09-19 13:05:18.903 INFO 30 --- [ main] o.s.j.s.AbstractConnector: Stopped Spark@45d389f2{HTTP/1.1, (http/1.1)}{0.0.0.0:4040} > 2022-09-19 13:05:18.914 INFO 30 --- [ main] o.a.s.u.SparkUI : Stopped Spark web UI at http://my-app-ee12f78355d97dc2-driver-svc.my-app-namespace.svc:4040 > 2022-09-19 13:05:18.927 INFO 30 --- [ main] .s.c.k.KubernetesClusterSchedulerBackend : Shutting down all executors > 2022-09-19 13:05:18.928 INFO 30 --- [rainedScheduler] chedulerBackend$KubernetesDriverEndpoint : Asking each executor to shut down > 2022-09-19 13:05:18.938 DEBUG 30 --- [ rpc-server-4-1] o.a.s.n.u.NettyLogger: [id: 0x07e91ece, L:/100.64.15.15:7078 - R:/100.64.15.215:40996] WRITE: MessageWithHeader [headerLength: 13, bodyLength: 198] > 2022-09-19 13:05:18.939 DEBUG 30 --- [ rpc-server-4-1] o.a.s.n.u.NettyLogger: [id: 0x07e91ece, L:/100.64.15.15:7078 - R:/100.64.15.215:40996]
[GitHub] [spark] HyukjinKwon opened a new pull request, #37939: [MINOR][DOCS][PYTHON] Document datetime.timedelta <> DayTimeIntervalType
HyukjinKwon opened a new pull request, #37939: URL: https://github.com/apache/spark/pull/37939 ### What changes were proposed in this pull request? This PR proposes to document datetime.timedelta support in PySpark in SQL DataType reference page. This support was added in SPARK-37275 ### Why are the changes needed? To show the support of datetime.timedelta. ### Does this PR introduce _any_ user-facing change? Yes, this fixes the documentation. ### How was this patch tested? CI in this PR should validate the build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HyukjinKwon commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974870249 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -216,6 +218,105 @@ def applyInPandas( jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr()) return DataFrame(jdf, self.session) +def applyInPandasWithState( +self, +func: "PandasGroupedMapFunctionWithState", +outputStructType: Union[StructType, str], +stateStructType: Union[StructType, str], +outputMode: str, +timeoutConf: str, +) -> DataFrame: +""" +Applies the given function to each group of data, while maintaining a user-defined +per-group state. The result Dataset will represent the flattened record returned by the +function. + +For a streaming Dataset, the function will be invoked for each group repeatedly in every +trigger, and updates to each group's state will be saved across invocations. The function +will also be invoked for each timed-out state repeatedly. The sequence of the invocation +will be input data -> state timeout. When the function is invoked for state timeout, there +will be no data being presented. + +The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and +returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple +of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as +:class:`pyspark.sql.streaming.state.GroupStateImpl`. + +For each group, all columns are passed together as `pandas.DataFrame` to the user-function, +and the returned `pandas.DataFrame` across all invocations are combined as a +:class:`DataFrame`. Note that the user function should loop through and process all +elements in the iterator. The user function should not make a guess of the number of +elements in the iterator. + +The `outputStructType` should be a :class:`StructType` describing the schema of all +elements in returned value, `pandas.DataFrame`. The column labels of all elements in +returned value, `pandas.DataFrame` must either match the field names in the defined +schema if specified as strings, or match the field data types by position if not strings, +e.g. integer indices. + +The `stateStructType` should be :class:`StructType` describing the schema of user-defined +state. The value of state will be presented as a tuple, as well as the update should be +performed with the tuple. User defined types e.g. native Python class types are not Review Comment: I think we can just say that "the corresponding Python types for :class:`DataType` are supported". Documented here https://spark.apache.org/docs/latest/sql-ref-datatypes.html (click python tab) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r97486 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, Review Comment: We can do both, setting to 4 if it's less than 4, with warning log that they're encouraged to set it higher. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
HyukjinKwon closed pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata URL: https://github.com/apache/spark/pull/37932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
HyukjinKwon commented on PR #37932: URL: https://github.com/apache/spark/pull/37932#issuecomment-1251799957 Merged to branch-3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974858058 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, Review Comment: Never mind. I just talked with @HyukjinKwon and understood how buffer works (I misunderstood) - it's more about how many small Arrow RecordBatches to buffer and flush at once for efficiency. An Arrow RecordBatch bigger than buffer will be still considered as a single Arrow RecordBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974858058 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, Review Comment: Never mind. I just talked with @HyukjinKwon and understood how buffer works (I misunderstood) - it's more about how many Arrow RecordBatches to buffer and flush at once for efficiency. An Arrow RecordBatch bigger than buffer will be still considered as a single Arrow RecordBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974856402 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.{FieldVector, VectorSchemaRoot} +import org.apache.arrow.vector.ipc.ArrowStreamWriter + +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} +import org.apache.spark.sql.execution.arrow.ArrowWriter +import org.apache.spark.sql.execution.arrow.ArrowWriter.createFieldWriter +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.types.{BinaryType, BooleanType, IntegerType, StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +/** + * This class abstracts the complexity on constructing Arrow RecordBatches for data and state with + * bin-packing and chunking. The caller only need to call the proper public methods of this class + * `startNewGroup`, `writeRow`, `finalizeGroup`, `finalizeData` and this class will write the data + * and state into Arrow RecordBatches with performing bin-pack and chunk internally. + * + * This class requires that the parameter `root` has initialized with the Arrow schema like below: + * - data fields + * - state field + * - nested schema (Refer ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA) + * + * Please refer the code comment in the implementation to see how the writes of data and state + * against Arrow RecordBatch work with consideration of bin-packing and chunking. + */ +class ApplyInPandasWithStateWriter( +root: VectorSchemaRoot, +writer: ArrowStreamWriter, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) { + + import ApplyInPandasWithStateWriter._ + + // We logically group the columns by family (data vs state) and initialize writer separately, + // since it's lot more easier and probably performant to write the row directly rather than + // projecting the row to match up with the overall schema. + // + // The number of data rows and state metadata rows can be different which could be problematic Review Comment: We use a single Arrow RecordBatch for both data and state - I'll mention this and also the rationalization explicitly in the code comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974854298 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2705,6 +2705,44 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH = + buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch") + .internal() + .doc("When using applyInPandasWithState, set a soft limit of the accumulated size of " + +"records that can be written to a single ArrowRecordBatch in memory. This is used to " + +"restrict the amount of memory being used to materialize the data in both executor and " + +"Python worker. The accumulated size of records are calculated via sampling a set of " + +"records. Splitting the ArrowRecordBatch is performed per record, so unless a record " + +"is quite huge, the size of constructed ArrowRecordBatch will be around the " + +"configured value.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") Review Comment: (closing the loop) We decided to simply use the number of rows for the condition of constructing Arrow RecordBatch. This will remove all new configs being introduced here, as well as reduce lots of complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
LuciferYang commented on code in PR #37938: URL: https://github.com/apache/spark/pull/37938#discussion_r974854247 ## common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java: ## @@ -237,6 +241,10 @@ protected void serviceInit(Configuration externalConf) throws Exception { boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); +if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) { Review Comment: Use Utils.isTesting directly need fix some other cases, for example: ``` test("recovery db should not be created if NM recovery is not enabled") { s1 = new YarnShuffleService s1.init(yarnConfig) s1._recoveryPath should be (null) s1.registeredExecutorFile should be (null) s1.secretsFile should be (null) } ``` Is it better to use a separate configuration as this pr? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`
Yikun commented on PR #36087: URL: https://github.com/apache/spark/pull/36087#issuecomment-1251792098 Here is a simple demo to show why we need them: https://github.com/Yikun/spark-docker/pull/5 - docker image build with tag v3.3.0 - test with 3.3.0 K8S IT in github action - after test passed, we can publish 3.3.0 safely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
LuciferYang commented on code in PR #37938: URL: https://github.com/apache/spark/pull/37938#discussion_r974846876 ## common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java: ## @@ -237,6 +241,10 @@ protected void serviceInit(Configuration externalConf) throws Exception { boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); +if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) { Review Comment: It is not sure whether it is possible to use `Utils.isTesting` directly, but it requires a refactor work because `Utils.isTesting` cannot be used in this module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
LuciferYang commented on PR #37938: URL: https://github.com/apache/spark/pull/37938#issuecomment-1251782282 cc @tgravescs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios
LuciferYang opened a new pull request, #37938: URL: https://github.com/apache/spark/pull/37938 ### What changes were proposed in this pull request? After SPARK-17321, `YarnShuffleService` will persist data to local shuffle state db/reload data from local shuffle state db only when Yarn NodeManager start with `YarnConfiguration#NM_RECOVERY_ENABLED = true`. `YarnShuffleIntegrationSuite` not set `YarnConfiguration#NM_RECOVERY_ENABLED` and the default value of the configuration is false, so `YarnShuffleIntegrationSuite` will neither trigger data persistence to the db nor verify the reload of data. This pr aims to let `YarnShuffleIntegrationSuite` restart the verification of registeredExecFile reload scenarios, to achieve this goal, this pr make the following changes: 1. Add a new un-document configuration `spark.yarn.shuffle.integrationTesting` to `YarnShuffleService`, and Initialize `_recoveryPath` when `_recoveryPath == null && spark.yarn.shuffle.integrationTesting == true`. 2. Only set `spark.yarn.shuffle.integrationTesting = true` in `YarnShuffleIntegrationSuite`, and add assertions to check `registeredExecFile` is not null to ensure that registeredExecFile reload scenarios will be verified. ### Why are the changes needed? Fix registeredExecFile reload test scenarios. Why not test by configuring `YarnConfiguration#NM_RECOVERY_ENABLED` as true? This configuration has been tried **Hadoop 3.3.4** ``` build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-3 ``` ``` 2022-09-10T11:44:42.1710230Z Cause: java.lang.ClassNotFoundException: org.apache.hadoop.shaded.org.iq80.leveldb.DBException 2022-09-10T11:44:42.1715234Z at java.net.URLClassLoader.findClass(URLClassLoader.java:387) 2022-09-10T11:44:42.1719347Z at java.lang.ClassLoader.loadClass(ClassLoader.java:419) 2022-09-10T11:44:42.1723090Z at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 2022-09-10T11:44:42.1726759Z at java.lang.ClassLoader.loadClass(ClassLoader.java:352) 2022-09-10T11:44:42.1731028Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313) 2022-09-10T11:44:42.1735424Z at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370) 2022-09-10T11:44:42.1740303Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) 2022-09-10T11:44:42.1745576Z at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597) 2022-09-10T11:44:42.1828858Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) 2022-09-10T11:44:42.1829712Z at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109) 2022-09-10T11:44:42.1830633Z at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327) 2022-09-10T11:44:42.1831431Z at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) 2022-09-10T11:44:42.1832279Z at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:112) ``` **Hadoop 2.7.4** ``` build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite -Phadoop-2 ``` ``` YarnShuffleIntegrationWithLevelDBBackendSuite: org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite *** ABORTED *** java.lang.IllegalArgumentException: Cannot support recovery with an ephemeral server port. Check the setting of yarn.nodemanager.address at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120) at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120) at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278) at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193) ... Run completed in 3 seconds, 992 milliseconds. Total number of tests run: 0 Suites: completed 1, aborted 1 Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0 *** 1 SUITE ABORTED ***
[GitHub] [spark] beliefer opened a new pull request, #37937: [SPARK-40491][SQL] Expose a jdbcRDD function in SparkContext
beliefer opened a new pull request, #37937: URL: https://github.com/apache/spark/pull/37937 ### What changes were proposed in this pull request? According to the legacy document of `JdbcRDD`, we need to expose a jdbcRDD function in `SparkContext`. ### Why are the changes needed? The `jdbcRDD` function in `SparkContext` easy use for users. ### Does this PR introduce _any_ user-facing change? 'No'. Just is a new API. ### How was this patch tested? New tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
alex-balikov commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974707164 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, +"Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") + + // applyInPandasWithState has its own mechanism to construct the Arrow RecordBatch instance. + // Configurations are both applied to executor and Python worker, set them to the worker conf + // to let Python worker read the config properly. + override protected val workerConf: Map[String, String] = initialWorkerConf + +(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH.key -> + softLimitBytesPerBatch.toString) + +(SQLConf.MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE.key -> + minDataCountForSample.toString) + +(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH.key -> + softTimeoutMillsPurgeBatch.toString) + + private val stateRowDeserializer = stateEncoder.createDeserializer() + + override protected def handleMetadataBeforeExec(stream: DataOutputStream): Unit = { +super.handleMetadataBeforeExec(stream) +// Also write the schema for state value +PythonRDD.writeUTF(stateValueSchema.json, stream) + } + + protected def writeIteratorToArrowStream( + root: VectorSchemaRoot, + writer: ArrowStreamWriter, + dataOut: DataOutputStream, + inputIterator: Iterator[InType]): Unit = { +val w = new ApplyInPandasWithStateWriter(root, writer, softLimitBytesPerBatch, + minDataCountForSample, softTimeoutMillsPurgeBatch) + +while (inputIterator.hasNext) { + val (keyRow, groupState, dataIter) = inputIterator.next() + assert(dataIter.hasNext, "should have at least one data row!") + w.startNewGroup(keyRow, groupState) + + while (dataIter.hasNext) { +val dataRow = dataIter.next() +w.writeRow(dataRow)
[GitHub] [spark] zhengruifeng commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
zhengruifeng commented on code in PR #37918: URL: https://github.com/apache/spark/pull/37918#discussion_r974838677 ## mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala: ## @@ -496,18 +499,23 @@ class ALSModel private[ml] ( .iterator.map { j => (srcId, dstIds(j), scores(j)) } } } - } -// We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. -val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) -val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) - .toDF("id", "recommendations") + }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn) + +val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, num, true) + .toAggregateExpression(false) Review Comment: sure, I think we don't want to expose it, so mark it `private[spark]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #37840: [SPARK-40416][SQL] Move subquery expression CheckAnalysis error messages to use the new error framework
gengliangwang commented on code in PR #37840: URL: https://github.com/apache/spark/pull/37840#discussion_r974833681 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -3911,6 +3911,15 @@ object SQLConf { .checkValues(ErrorMessageFormat.values.map(_.toString)) .createWithDefault(ErrorMessageFormat.PRETTY.toString) + val INCLUDE_PLANS_IN_ERRORS = buildConf("spark.sql.error.includePlans") Review Comment: @dtenedor let's revert the changes in SQLConf.scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #37926: [SPARK-40484][BUILD] Upgrade log4j2 to 2.19.0
LuciferYang commented on PR #37926: URL: https://github.com/apache/spark/pull/37926#issuecomment-1251761360 thanks @viirya @srowen @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`
Yikun commented on PR #36087: URL: https://github.com/apache/spark/pull/36087#issuecomment-1251757187 And also this https://github.com/apache/spark/pull/36096 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`
Yikun commented on PR #36087: URL: https://github.com/apache/spark/pull/36087#issuecomment-1251756266 @dongjoon-hyun Could we backport this to branch-3.3, this will very help to run branch-3.3 K8S in github action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #37929: [SPARK-40486][PS] Implement `spearman` and `kendall` in `DataFrame.corrwith`
zhengruifeng commented on PR #37929: URL: https://github.com/apache/spark/pull/37929#issuecomment-1251750796 cc @itholic @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`
Yikun commented on code in PR #37923: URL: https://github.com/apache/spark/pull/37923#discussion_r974808692 ## python/pyspark/pandas/groupby.py: ## @@ -993,6 +994,101 @@ def nth(self, n: int) -> FrameLike: return self._prepare_return(DataFrame(internal)) +def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0): Review Comment: ```suggestion def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0) -> FrameLike: ``` ## python/pyspark/pandas/groupby.py: ## @@ -18,7 +18,6 @@ """ A wrapper for GroupedData to behave similar to pandas GroupBy. """ - Review Comment: unrelated change ## python/pyspark/pandas/groupby.py: ## @@ -993,6 +994,101 @@ def nth(self, n: int) -> FrameLike: return self._prepare_return(DataFrame(internal)) +def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0): +""" +Compute prod of groups. + +.. versionadded:: 3.4.0 + +Parameters +-- +numeric_only : bool, default False +Include only float, int, boolean columns. If None, will attempt to use +everything, then use only numeric data. + +min_count: int, default 0 +The required number of valid values to perform the operation. +If fewer than min_count non-NA values are present the result will be NA. + +Returns +--- +pyspark.pandas.Series or pyspark.pandas.DataFrame + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby + +Examples + +>>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], +...'B': [np.nan, 2, 3, 4, 5], +...'C': [1, 2, 1, 1, 2], +...'D': [True, False, True, False, True]}) + +Groupby one column and return the prod of the remaining columns in +each group. + +>>> df.groupby('A').prod().sort_index() + B C D +A +1 8.0 2 0 +2 15.0 2 1 + +>>> df.groupby('A').prod(min_count=3).sort_index() + B C D +A +1 NaN 2.0 0.0 +2 NaN NaN NaN +""" + +self._validate_agg_columns(numeric_only=numeric_only, function_name="prod") + +groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] +internal, agg_columns, sdf = self._prepare_reduce( +groupkey_names=groupkey_names, +accepted_spark_types=(NumericType, BooleanType), +bool_to_numeric=True, +) + +psdf: DataFrame = DataFrame(internal) +if len(psdf._internal.column_labels) > 0: +tmp_count_column = "__tmp_%s_count_col__" Review Comment: ```suggestion tmp_count_column = verify_temp_column_name(psdf, "__tmp_%s_count_col__") ``` You might want to verify column to aovid pontential column name conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974805894 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, +"Pandas execution requires more than 4 bytes. Please set higher buffer. " + + s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") + + // applyInPandasWithState has its own mechanism to construct the Arrow RecordBatch instance. + // Configurations are both applied to executor and Python worker, set them to the worker conf + // to let Python worker read the config properly. + override protected val workerConf: Map[String, String] = initialWorkerConf + +(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH.key -> + softLimitBytesPerBatch.toString) + +(SQLConf.MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE.key -> + minDataCountForSample.toString) + +(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH.key -> + softTimeoutMillsPurgeBatch.toString) + + private val stateRowDeserializer = stateEncoder.createDeserializer() + + override protected def handleMetadataBeforeExec(stream: DataOutputStream): Unit = { +super.handleMetadataBeforeExec(stream) +// Also write the schema for state value +PythonRDD.writeUTF(stateValueSchema.json, stream) + } + + protected def writeIteratorToArrowStream( Review Comment: I agree the order of parameters didn't strictly follow some well-known best practice, but the change requires to change the base class instead of this. May need a follow-up JIRA ticket / PR to address this in general. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974803979 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, Review Comment: This is borrowed from ArrowPythonRunner. Btw, I realized we should not allow Arrow RecordBatch in this runner to be split down due to buffer size - this runner has to have a full control of Arrow RecordBatch. We'll have to set this be constant something like Long.MAX_VALUE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974798726 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], Review Comment: We can't use `workerConf` here since `override protected val workerConf`. So it's something like `_workerConf` vs `initialWorkerConf` and then it doesn't sound too bad to have prefix rather than `_`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] warrenzhu25 commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
warrenzhu25 commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974793372 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1860,8 +1867,18 @@ private[spark] class DAGScheduler( s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { failedStage.failedAttemptIds.add(task.stageAttemptId) + val ignoreStageFailure = ignoreDecommissionFetchFailure && +isExecutorDecommissioned(taskScheduler, bmAddress) + if (ignoreStageFailure) { +logInfo("Ignoring fetch failure from $task of $failedStage attempt " + + s"${task.stageAttemptId} when count spark.stage.maxConsecutiveAttempts " + + "as executor ${bmAddress.executorId} is decommissioned and " + + s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true") + } + val shouldAbortStage = -failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || +(!ignoreStageFailure && + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts) || disallowStageRetryForTest Review Comment: Good catch. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974784396 ## python/pyspark/sql/pandas/serializers.py: ## @@ -371,3 +375,292 @@ def load_stream(self, stream): raise ValueError( "Invalid number of pandas.DataFrames in group {0}".format(dataframes_in_group) ) + + +class ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer): Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
viirya commented on code in PR #37934: URL: https://github.com/apache/spark/pull/37934#discussion_r974783229 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala: ## @@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession vectorizedReader.initBatch(schema, partitionValues) vectorizedReader.nextKeyValue() val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] - - // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` - // in order to use get(...) method which is not implemented in `ColumnarBatch`. Review Comment: But when used in partition column, `ColumnarBatchRow` is used instead of `GenericMutableRow` (i.e. `copy` won't be called). Is it correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`
zhengruifeng commented on PR #37923: URL: https://github.com/apache/spark/pull/37923#issuecomment-1251698604 ``` Oh no! 2 files would be reformatted, 352 files would be left unchanged. Please run 'dev/reformat-python' script. 1 Error: Process completed with exit code 1. ``` `lint-python` failed, you would install related packages in your env via `pip install -r dev/requirements.txt `, and then reformat the python files via `dev/reformat-python` and check the lint locally via `dev/lint-python` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`
zhengruifeng commented on code in PR #37923: URL: https://github.com/apache/spark/pull/37923#discussion_r974780246 ## python/pyspark/pandas/groupby.py: ## @@ -993,6 +994,101 @@ def nth(self, n: int) -> FrameLike: return self._prepare_return(DataFrame(internal)) +def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0): +""" +Compute prod of groups. + +.. versionadded:: 3.4.0 + +Parameters +-- +numeric_only : bool, default False +Include only float, int, boolean columns. If None, will attempt to use +everything, then use only numeric data. + +min_count: int, default 0 +The required number of valid values to perform the operation. +If fewer than min_count non-NA values are present the result will be NA. + +Returns +--- +pyspark.pandas.Series or pyspark.pandas.DataFrame Review Comment: ```suggestion Series or DataFrame ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`
zhengruifeng commented on code in PR #37923: URL: https://github.com/apache/spark/pull/37923#discussion_r974779483 ## python/pyspark/pandas/groupby.py: ## @@ -44,6 +43,7 @@ ) import warnings +import numpy as np Review Comment: `numpy` in the docstring was imported in `def _test()`, so no need to import it here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] chaoqin-li1123 commented on pull request #37935: Do maintenance before streaming StateStore unload
chaoqin-li1123 commented on PR #37935: URL: https://github.com/apache/spark/pull/37935#issuecomment-1251692899 @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] kazuyukitanimura commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
kazuyukitanimura commented on code in PR #37934: URL: https://github.com/apache/spark/pull/37934#discussion_r974775400 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala: ## @@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession vectorizedReader.initBatch(schema, partitionValues) vectorizedReader.nextKeyValue() val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] - - // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` - // in order to use get(...) method which is not implemented in `ColumnarBatch`. - val actual = row.copy().get(1, dt) Review Comment: That's correct. The `copy()` converts `row` from `ColumnarBatchRow` to `GenericInternalRow` and there null works. With this PR, `null` is directly supported in `ColumnarBatchRow.get()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] kazuyukitanimura commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
kazuyukitanimura commented on code in PR #37934: URL: https://github.com/apache/spark/pull/37934#discussion_r974772538 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala: ## @@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession vectorizedReader.initBatch(schema, partitionValues) vectorizedReader.nextKeyValue() val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] - - // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` - // in order to use get(...) method which is not implemented in `ColumnarBatch`. Review Comment: I think so, overtime `get` is implemented. cc original author of this test @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR closed pull request #37917: [SPARK-40466][SS] Improve the error message when DSv2 is disabled while DSv1 is not avaliable
HeartSaVioR closed pull request #37917: [SPARK-40466][SS] Improve the error message when DSv2 is disabled while DSv1 is not avaliable URL: https://github.com/apache/spark/pull/37917 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #37917: [SPARK-40466][SS] Improve the error message when DSv2 is disabled while DSv1 is not avaliable
HeartSaVioR commented on PR #37917: URL: https://github.com/apache/spark/pull/37917#issuecomment-1251667880 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL opened a new pull request, #37936: Add additional tests to StreamingSessionWindowSuite
WweiL opened a new pull request, #37936: URL: https://github.com/apache/spark/pull/37936 ## What changes were proposed in this pull request? Add complex tests to `StreamingSessionWindowSuite`. Concretely, I created two helper functions, - one is called `sessionWindowQueryNestedKey`, which would convert `sessionId` from the single word key used in `sessionWindowQuery` to a nested column key. For example: `"hello" -> (("hello", "hello"), "hello")`. - The other is called `sessionWindowQueryMultiColKey`. It would convert `sessionId` from the single word key used in `sessionWindowQuery` to two columns. For example: "hello" -> col1: ("hello", "hello"), col2: "hello" With the two new helper functions, I added more tests for the tests for `complete mode` and `append mode`, also for `cap gap duration` and `async state`. The logic of the tests are not changed at all, just the key. For the aggregation test (`session window - with more aggregation functions`), I added some more functions to test, and I tried `first()` and `last()` function on a nested triple, created using the same method as the above. ## How was this patch tested? All are tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
mengxr commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r974747883 ## python/pyspark/ml/functions.py: ## @@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if batch_size <= 0 or batch_size >= len(df): +yield df +else: +# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // batch_size): +for _, batch in df.groupby(np.arange(len(df)) // batch_size): +yield batch + + +def has_tensor_cols(df: pd.DataFrame) -> bool: +"""Check if input DataFrame contains any tensor-valued columns""" +if any(df.dtypes == np.object_): +# pd.DataFrame object types can contain different types, e.g. string, dates, etc. +# so inspect a row and check for array/list type +sample = df.iloc[0] +return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in sample]) +else: +return False + + +def batch_infer_udf( +predict_batch_fn: Callable, +return_type: DataType = ArrayType(FloatType()), +batch_size: int = -1, +input_names: list[str] = [], +input_tensor_shapes: list[list[int]] = [], +**kwargs: Any, +) -> Callable: +"""Given a function which loads a model, returns a pandas_udf for inferencing over that model. + +This will handle: +- conversion of the Spark DataFrame to numpy arrays. +- batching of the inputs sent to the model predict() function. +- caching of the model and prediction function on the executors. + +This assumes that the `predict_batch_fn` encapsulates all of the necessary dependencies for +running the model or the Spark executor environment already satisfies all runtime requirements. + +When selecting columns in pyspark SQL, users are required to always use `struct` for simplicity. + +For the conversion of Spark DataFrame to numpy, the following table describes the behavior, +where tensor columns in the Spark DataFrame must be represented as a flattened 1-D array/list. + +| dataframe \\ model | single input | multiple inputs | +| :- | :--- | :-- | +| single-col scalar | 1| N/A | +| single-col tensor | 1,2 | N/A | +| multi-col scalar | 3| 4 | Review Comment: Discussed with @leewyang offline. We will drop this scenario. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #37729: Revert "[SPARK-33861][SQL] Simplify conditional in predicate"
dongjoon-hyun commented on PR #37729: URL: https://github.com/apache/spark/pull/37729#issuecomment-1251641154 Sorry, but I missed that this is an ancient patch. To @wangyum , we need a new JIRA when we revert already released patches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
viirya commented on code in PR #37934: URL: https://github.com/apache/spark/pull/37934#discussion_r974736080 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala: ## @@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession vectorizedReader.initBatch(schema, partitionValues) vectorizedReader.nextKeyValue() val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] - - // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` - // in order to use get(...) method which is not implemented in `ColumnarBatch`. - val actual = row.copy().get(1, dt) Review Comment: I can run this test with `null` without this change (L1467). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
viirya commented on code in PR #37934: URL: https://github.com/apache/spark/pull/37934#discussion_r974734194 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala: ## @@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession vectorizedReader.initBatch(schema, partitionValues) vectorizedReader.nextKeyValue() val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow] - - // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch` - // in order to use get(...) method which is not implemented in `ColumnarBatch`. Review Comment: Hmm, why it said `get` is not implemented in `ColumnarBatch`? Is this just out-dated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] chaoqin-li1123 opened a new pull request, #37935: Do maintenance before streaming StateStore unload
chaoqin-li1123 opened a new pull request, #37935: URL: https://github.com/apache/spark/pull/37935 ### What changes were proposed in this pull request? Before unload of a StateStore, perform a cleanup. ### Why are the changes needed? Current the maintenance of StateStore is performed by a periodic task in the management thread. If a streaming query become inactive before the next maintenance task fire, its StateStore will be unloaded before cleanup. There are 2 cases when a StateStore is unloaded. 1. StateStoreProvider is not longer active in the system, for example, when a query ends or the spark context terminates. 2. There is other active StateStoreProvider in the system, for example, when a partition is reassigned. In case 1, we should do one last maintenance before unloading the instance, ### Does this PR introduce _any_ user-facing change? Shutdown delay of a query may increase because a maintenance task is scheduled. ### How was this patch tested? Add an integration test that verify that redundant delta file is deleted when StateStore instances is deactivated and unloaded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wbo4958 commented on pull request #37855: [SPARK-40407][SQL] Fix the potential data skew caused by df.repartition
wbo4958 commented on PR #37855: URL: https://github.com/apache/spark/pull/37855#issuecomment-1251592287 > > @wbo4958 > > Issue: The xgboost code uses rdd barrier mode, but barrier mode does not work with `coalesce` operator. @mridulm just suggested using another random way borrowing from RDD.coalesce to get the starting position, it will not introduce any coalesce into repartition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #37424: [SPARK-39991][SQL][AQE] Use available column statistics from completed query stages
dongjoon-hyun closed pull request #37424: [SPARK-39991][SQL][AQE] Use available column statistics from completed query stages URL: https://github.com/apache/spark/pull/37424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #37922: [WIP][SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished
AmplabJenkins commented on PR #37922: URL: https://github.com/apache/spark/pull/37922#issuecomment-1251551259 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`
AmplabJenkins commented on PR #37923: URL: https://github.com/apache/spark/pull/37923#issuecomment-1251551212 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
AmplabJenkins commented on PR #37924: URL: https://github.com/apache/spark/pull/37924#issuecomment-1251551174 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
alex-balikov commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974680745 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType, OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER} +import org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} + + +/** + * A variant implementation of [[ArrowPythonRunner]] to serve the operation + * applyInPandasWithState. + */ +class ApplyInPandasWithStatePythonRunner( +funcs: Seq[ChainedPythonFunctions], +evalType: Int, +argOffsets: Array[Array[Int]], +inputSchema: StructType, +override protected val timeZoneId: String, +initialWorkerConf: Map[String, String], +stateEncoder: ExpressionEncoder[Row], +keySchema: StructType, +valueSchema: StructType, +stateValueSchema: StructType, +softLimitBytesPerBatch: Long, +minDataCountForSample: Int, +softTimeoutMillsPurgeBatch: Long) + extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets) + with PythonArrowInput[InType] + with PythonArrowOutput[OutType] { + + override protected val schema: StructType = inputSchema.add("__state", STATE_METADATA_SCHEMA) + + override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback + + override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize + require( +bufferSize >= 4, Review Comment: why do we care to throw this exception. may just ensure that the buffer size if bigger: override val bufferSize: Int = max(4, SQLConf.get.pandasUDFBufferSize) ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.api.python._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.api.python.PythonSQLUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import
[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
alex-balikov commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974672806 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2705,6 +2705,44 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH = + buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch") + .internal() + .doc("When using applyInPandasWithState, set a soft limit of the accumulated size of " + +"records that can be written to a single ArrowRecordBatch in memory. This is used to " + +"restrict the amount of memory being used to materialize the data in both executor and " + +"Python worker. The accumulated size of records are calculated via sampling a set of " + +"records. Splitting the ArrowRecordBatch is performed per record, so unless a record " + +"is quite huge, the size of constructed ArrowRecordBatch will be around the " + +"configured value.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") + + val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE = Review Comment: I wonder if we really care to have this param. Ultimately if the sizing estimate works badly, the users can just set a lower value for the batch size limit. I do not think it is useful to let them tune this parameter. ## sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala: ## @@ -620,6 +622,35 @@ class RelationalGroupedDataset protected[sql]( Dataset.ofRows(df.sparkSession, plan) } + private[sql] def applyInPandasWithState( Review Comment: method level comment ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2705,6 +2705,44 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH = + buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch") + .internal() + .doc("When using applyInPandasWithState, set a soft limit of the accumulated size of " + +"records that can be written to a single ArrowRecordBatch in memory. This is used to " + +"restrict the amount of memory being used to materialize the data in both executor and " + +"Python worker. The accumulated size of records are calculated via sampling a set of " + +"records. Splitting the ArrowRecordBatch is performed per record, so unless a record " + +"is quite huge, the size of constructed ArrowRecordBatch will be around the " + +"configured value.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") Review Comment: I agree that expressing the limit in terms of bytes is more meaningful that records. However we estimate the bytes size efficiently. Specifically here I would rename 'softLimitSizePerBatch' by removing 'soft' - we can clarify in the comment about that and also including 'Bytes' - 'batchSizeLimitBytes' . Also wonder if we should have the property specific to applyInPandasWithState or should we make it general - remove the applyInPandasWithState scoping even if we do not support this limit initially, seems like generally meaningful and we can follow up fixing the other places as a bug. ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2705,6 +2705,44 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH = + buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch") + .internal() + .doc("When using applyInPandasWithState, set a soft limit of the accumulated size of " + +"records that can be written to a single ArrowRecordBatch in memory. This is used to " + +"restrict the amount of memory being used to materialize the data in both executor and " + +"Python worker. The accumulated size of records are calculated via sampling a set of " + +"records. Splitting the ArrowRecordBatch is performed per record, so unless a record " + +"is quite huge, the size of constructed ArrowRecordBatch will be around the " + +"configured value.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") + + val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE = + buildConf("spark.sql.execution.applyInPandasWithState.minDataCountForSample") + .internal() + .doc("When using applyInPandasWithState, specify the minimum number of records to sample " + +"the size of record. The size being retrieved from sampling will be used to estimate " + +"the accumulated size of records. Note that limiting by size does not
[GitHub] [spark] MaxGekk commented on pull request #37921: [SPARK-40479][SQL] Migrate unexpected input type error to an error class
MaxGekk commented on PR #37921: URL: https://github.com/apache/spark/pull/37921#issuecomment-1251528429 @srielau @anchovYu Could you take a look at the PR, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
leewyang commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r974649030 ## python/pyspark/ml/functions.py: ## @@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if batch_size <= 0 or batch_size >= len(df): +yield df +else: +# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // batch_size): +for _, batch in df.groupby(np.arange(len(df)) // batch_size): +yield batch + + +def has_tensor_cols(df: pd.DataFrame) -> bool: +"""Check if input DataFrame contains any tensor-valued columns""" +if any(df.dtypes == np.object_): +# pd.DataFrame object types can contain different types, e.g. string, dates, etc. +# so inspect a row and check for array/list type +sample = df.iloc[0] +return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in sample]) +else: +return False + + +def batch_infer_udf( +predict_batch_fn: Callable, +return_type: DataType = ArrayType(FloatType()), +batch_size: int = -1, +input_names: list[str] = [], Review Comment: - mutable `[]` is fixed. - for now, the `input_names` is an explicit signal to the code that the user is expecting multiple named inputs, and for most simple (single-input) cases, this wouldn't be used/required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
leewyang commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r974646917 ## python/pyspark/ml/functions.py: ## @@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if batch_size <= 0 or batch_size >= len(df): +yield df +else: +# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // batch_size): +for _, batch in df.groupby(np.arange(len(df)) // batch_size): +yield batch + + +def has_tensor_cols(df: pd.DataFrame) -> bool: +"""Check if input DataFrame contains any tensor-valued columns""" +if any(df.dtypes == np.object_): +# pd.DataFrame object types can contain different types, e.g. string, dates, etc. +# so inspect a row and check for array/list type +sample = df.iloc[0] +return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in sample]) +else: +return False + + +def batch_infer_udf( +predict_batch_fn: Callable, +return_type: DataType = ArrayType(FloatType()), +batch_size: int = -1, +input_names: list[str] = [], +input_tensor_shapes: list[list[int]] = [], +**kwargs: Any, +) -> Callable: +"""Given a function which loads a model, returns a pandas_udf for inferencing over that model. + +This will handle: +- conversion of the Spark DataFrame to numpy arrays. +- batching of the inputs sent to the model predict() function. +- caching of the model and prediction function on the executors. + +This assumes that the `predict_batch_fn` encapsulates all of the necessary dependencies for +running the model or the Spark executor environment already satisfies all runtime requirements. + +When selecting columns in pyspark SQL, users are required to always use `struct` for simplicity. + +For the conversion of Spark DataFrame to numpy, the following table describes the behavior, +where tensor columns in the Spark DataFrame must be represented as a flattened 1-D array/list. + +| dataframe \\ model | single input | multiple inputs | +| :- | :--- | :-- | +| single-col scalar | 1| N/A | +| single-col tensor | 1,2 | N/A | +| multi-col scalar | 3| 4 | Review Comment: I think it's fine to leave this for now... Otherwise, we would just add an error response in this case to tell folks to use `array`. So it seems it'd be better to just support it vs. raising an error (unless this causes issues elsewhere). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on pull request #37908: [SPARK-40196][PS][FOLLOWUP] `SF.lit` -> `F.lit` in `window.quantile`
xinrong-meng commented on PR #37908: URL: https://github.com/apache/spark/pull/37908#issuecomment-1251489189 Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions
leewyang commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r974626628 ## python/pyspark/ml/functions.py: ## @@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]: +"""Generator that splits a pandas dataframe/series into batches.""" +if batch_size <= 0 or batch_size >= len(df): +yield df +else: +# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // batch_size): +for _, batch in df.groupby(np.arange(len(df)) // batch_size): +yield batch + + +def has_tensor_cols(df: pd.DataFrame) -> bool: +"""Check if input DataFrame contains any tensor-valued columns""" +if any(df.dtypes == np.object_): +# pd.DataFrame object types can contain different types, e.g. string, dates, etc. +# so inspect a row and check for array/list type +sample = df.iloc[0] +return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in sample]) +else: +return False + + +def batch_infer_udf( +predict_batch_fn: Callable, +return_type: DataType = ArrayType(FloatType()), +batch_size: int = -1, +input_names: list[str] = [], +input_tensor_shapes: list[list[int]] = [], +**kwargs: Any, +) -> Callable: +"""Given a function which loads a model, returns a pandas_udf for inferencing over that model. + +This will handle: +- conversion of the Spark DataFrame to numpy arrays. +- batching of the inputs sent to the model predict() function. +- caching of the model and prediction function on the executors. + +This assumes that the `predict_batch_fn` encapsulates all of the necessary dependencies for +running the model or the Spark executor environment already satisfies all runtime requirements. + +When selecting columns in pyspark SQL, users are required to always use `struct` for simplicity. Review Comment: Updated the code to support `col` and `str` column selections, so `struct` is not a requirement any more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] kazuyukitanimura commented on pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
kazuyukitanimura commented on PR #37934: URL: https://github.com/apache/spark/pull/37934#issuecomment-1251489126 cc @sunchao @viirya @flyrain -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on pull request #37912: [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat
xinrong-meng commented on PR #37912: URL: https://github.com/apache/spark/pull/37912#issuecomment-1251488032 Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on pull request #37888: [SPARK-40196][PYTHON][PS] Consolidate `lit` function with NumPy scalar in sql and pandas module
xinrong-meng commented on PR #37888: URL: https://github.com/apache/spark/pull/37888#issuecomment-1251487641 Thank you @HyukjinKwon @zhengruifeng @Yikun for taking care of the merging! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] kazuyukitanimura opened a new pull request, #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`
kazuyukitanimura opened a new pull request, #37934: URL: https://github.com/apache/spark/pull/37934 ### What changes were proposed in this pull request? This PR proposes to support `NullType` in `ColumnarBatchRow`. ### Why are the changes needed? `ColumnarBatchRow.get()` does not support `NullType` currently. By supporting `NullType` in `ColumnarBatchRow`, `NullType` can be used for partition columns for example. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies
grundprinzip commented on code in PR #37710: URL: https://github.com/apache/spark/pull/37710#discussion_r974611991 ## connect/src/main/scala/org/apache/spark/sql/sparkconnect/planner/SparkConnectPlanner.scala: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sparkconnect.planner + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.connect.proto +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{expressions, plans} +import org.apache.spark.sql.catalyst.analysis.{ + UnresolvedAlias, + UnresolvedAttribute, + UnresolvedFunction, + UnresolvedRelation, + UnresolvedStar Review Comment: Basically, if I run ``` import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} ``` through ``` ./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false -Dscalafmt.onlyChanged=false -pl connect ``` it becomes this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
mridulm commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974600104 ## docs/configuration.md: ## @@ -2605,6 +2605,15 @@ Apart from these, the following properties are also available, and may be useful 2.2.0 + + spark.stage.attempt.ignoreOnDecommissionFetchFailure Review Comment: `spark.stage.attempt.ignoreOnDecommissionFetchFailure` -> `spark.stage.ignoreOnDecommissionFetchFailure` ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1860,8 +1867,18 @@ private[spark] class DAGScheduler( s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { failedStage.failedAttemptIds.add(task.stageAttemptId) + val ignoreStageFailure = ignoreDecommissionFetchFailure && +isExecutorDecommissioned(taskScheduler, bmAddress) + if (ignoreStageFailure) { +logInfo("Ignoring fetch failure from $task of $failedStage attempt " + + s"${task.stageAttemptId} when count spark.stage.maxConsecutiveAttempts " + + "as executor ${bmAddress.executorId} is decommissioned and " + + s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true") + } + val shouldAbortStage = -failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || +(!ignoreStageFailure && + failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts) || disallowStageRetryForTest Review Comment: QQ: We are preventing the immediate failure from aborting the stage, but might be effectively reducing the number of stage failures which can be tolerated ? For example: attempt 0, attempt 1, attempt 2 failed due to decommission attempt 3 failed for other reasons -> job failed (assuming maxConsecutiveStageAttempts = 4) Is this the behavior we will now exhibit ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies
grundprinzip commented on code in PR #37710: URL: https://github.com/apache/spark/pull/37710#discussion_r974602945 ## connect/src/main/scala/org/apache/spark/sql/sparkconnect/planner/SparkConnectPlanner.scala: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sparkconnect.planner + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.connect.proto +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{expressions, plans} +import org.apache.spark.sql.catalyst.analysis.{ + UnresolvedAlias, + UnresolvedAttribute, + UnresolvedFunction, + UnresolvedRelation, + UnresolvedStar Review Comment: I'm running this through `scalafmt` and checkstyle and it doesn't complain. What's the best way to solve this automatically? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies
grundprinzip commented on code in PR #37710: URL: https://github.com/apache/spark/pull/37710#discussion_r974599749 ## connect/src/main/scala/org/apache/spark/sql/sparkconnect/command/SparkConnectCommandPlanner.scala: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sparkconnect.command Review Comment: I'll change to connect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #37928: [SPARK-40485][SQL] Extend the partitioning options of the JDBC data source
AmplabJenkins commented on PR #37928: URL: https://github.com/apache/spark/pull/37928#issuecomment-1251440593 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shrprasa commented on pull request #37880: [SPARK-39399] [CORE] [K8S]: Fix proxy-user authentication for Spark on k8s in cluster deploy mode
shrprasa commented on PR #37880: URL: https://github.com/apache/spark/pull/37880#issuecomment-1251427562 @gaborgsomogyi @dongjoon-hyun @HyukjinKwon Can you please review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #37840: [SPARK-40416][SQL] Move subquery expression CheckAnalysis error messages to use the new error framework
dtenedor commented on code in PR #37840: URL: https://github.com/apache/spark/pull/37840#discussion_r974575955 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala: ## @@ -730,6 +729,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } + private def canonicalizeForError(expr: LogicalPlan): String = +if (SQLConf.get.getConf(SQLConf.CANONICALIZE_PLANS_IN_ERRORS)) { Review Comment: @gengliangwang thanks for the suggestion! This is done, it is simpler now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark
alex-balikov commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974517188 ## python/pyspark/worker.py: ## @@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type): if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name) +elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: +soft_limit_bytes_per_batch = runner_conf.get( + "spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch", +(64 * 1024 * 1024), +) +soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch) + +min_data_count_for_sample = runner_conf.get( + "spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100 Review Comment: similar comment about the property names and default values here and everywhere else - can they be defined in a more prominent place ## python/pyspark/worker.py: ## @@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type): if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name) +elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: +soft_limit_bytes_per_batch = runner_conf.get( + "spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch", +(64 * 1024 * 1024), Review Comment: can the default be value be defined in some more prominent place? Also the property names. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -311,6 +323,56 @@ object UnsupportedOperationChecker extends Logging { } } +// applyInPandasWithState +case m: FlatMapGroupsInPandasWithState if m.isStreaming => + // Check compatibility with output modes and aggregations in query + val aggsInQuery = collectStreamingAggregates(plan) + + if (aggsInQuery.isEmpty) { +// applyInPandasWithState without aggregation: operation's output mode must Review Comment: why do we even have operation output mode. We are defining a new api, can we just drop this parameter from the api if we are going to be enforcing for it t match the output mode? ## python/pyspark/worker.py: ## @@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type): if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name) +elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: +soft_limit_bytes_per_batch = runner_conf.get( + "spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch", +(64 * 1024 * 1024), +) +soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch) + +min_data_count_for_sample = runner_conf.get( + "spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100 +) +min_data_count_for_sample = int(min_data_count_for_sample) + +soft_timeout_millis_purge_batch = runner_conf.get( + "spark.sql.execution.applyInPandasWithState.softTimeoutPurgeBatch", 100 Review Comment: same ## python/pyspark/worker.py: ## @@ -207,6 +209,65 @@ def wrapped(key_series, value_series): return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))] +def wrap_grouped_map_pandas_udf_with_state(f, return_type): +def wrapped(key_series, value_series_gen, state): +import pandas as pd + +key = tuple(s[0] for s in key_series) + +if state.hasTimedOut: +# Timeout processing pass empty iterator. Here we return an empty DataFrame instead. +values = [ +pd.DataFrame(columns=pd.concat(next(value_series_gen), axis=1).columns), +] +else: +values = (pd.concat(x, axis=1) for x in value_series_gen) + +result_iter = f(key, values, state) + +def verify_element(result): +if not isinstance(result, pd.DataFrame): +raise TypeError( +"The type of element in return iterator of the user-defined function " +"should be pandas.DataFrame, but is {}".format(type(result)) +) +# the number of columns of result have to match the return type +# but it is fine for result to have no columns at all if it is empty +if not ( Review Comment: if not ... ? ## python/pyspark/worker.py: ## @@ -486,6 +580,35 @@ def mapper(a): vals = [a[o] for o in parsed_offsets[0][1]]
[GitHub] [spark] roczei commented on pull request #37679: [SPARK-35242][SQL] Support changing session catalog's default database
roczei commented on PR #37679: URL: https://github.com/apache/spark/pull/37679#issuecomment-1251364841 Thanks @cloud-fan, I have implemented this and all tests passed. As I see we have resolved all of your feedbacks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #37922: [WIP][SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished
mridulm commented on PR #37922: URL: https://github.com/apache/spark/pull/37922#issuecomment-1251349810 > The push-based shuffle service will auto clean up the old shuffle merge data Consider the case I mentioned above - stage retry for an `INDETERMINATE` stage. We cleanup previous attempts only if it happens to use the same merger in new attempt as well. The previous attempt's mergers are not reused for the next attempt - and so the previous mergers will continue to hold stale data without cleaning them up - until application terminates. Note - any merger which happens to be reused in new stage attempt will cleanup - I am referring to those which are not used: old attempt `mergerLocs` `--` new attempts `mergerLocs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ayudovin commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`
ayudovin commented on code in PR #37923: URL: https://github.com/apache/spark/pull/37923#discussion_r974514017 ## python/pyspark/pandas/groupby.py: ## @@ -993,6 +993,98 @@ def nth(self, n: int) -> FrameLike: return self._prepare_return(DataFrame(internal)) +def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0): +""" +Compute prod of groups. + +Parameters +-- +numeric_only : bool, default False +Include only float, int, boolean columns. If None, will attempt to use +everything, then use only numeric data. + +min_count: int, default 0 +The required number of valid values to perform the operation. +If fewer than min_count non-NA values are present the result will be NA. + +.. versionadded:: 3.4.0 + +Returns +--- +pyspark.pandas.Series or pyspark.pandas.DataFrame + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby + +Examples + +>>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], +...'B': [np.nan, 2, 3, 4, 5], +...'C': [1, 2, 1, 1, 2], +...'D': [True, False, True, False, True]}) + +Groupby one column and return the prod of the remaining columns in +each group. + +>>> df.groupby('A').prod().sort_index() + B C D +A +1 8.0 2 0 +2 15.0 2 11 + +>>> df.groupby('A').prod(min_count=3).sort_index() + B C D +A +1 NaN 2 0 +2 NaN NaN NaN +""" + +self._validate_agg_columns(numeric_only=numeric_only, function_name="prod") + +groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] +internal, agg_columns, sdf = self._prepare_reduce( +groupkey_names=groupkey_names, +accepted_spark_types=(NumericType, BooleanType), +bool_to_numeric=True, +) + +psdf: DataFrame = DataFrame(internal) +if len(psdf._internal.column_labels) > 0: + +stat_exprs = [] +for label in psdf._internal.column_labels: +psser = psdf._psser_for(label) +column = psser._dtype_op.nan_to_null(psser).spark.column +data_type = psser.spark.data_type + +if isinstance(data_type, IntegralType): + stat_exprs.append(F.product(column).cast(data_type).alias(f"{label[0]}")) +else: +stat_exprs.append(F.product(column).alias(f"{label[0]}")) + +stat_exprs.append(F.count(column).alias(f"{label[0]}_count")) Review Comment: yes, you're right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pralabhkumar commented on pull request #37417: [SPARK-33782][K8S][CORE]Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster m
pralabhkumar commented on PR #37417: URL: https://github.com/apache/spark/pull/37417#issuecomment-1251334364 @dongjoon-hyun , Have incorporated all the review comments , please look into the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #37930: [SPARK-40487][SQL] Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel
AmplabJenkins commented on PR #37930: URL: https://github.com/apache/spark/pull/37930#issuecomment-1251324416 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xiaonanyang-db opened a new pull request, #37933: SPARK-40474 Infer columns with mixed date and timestamp as String in CSV schema inference
xiaonanyang-db opened a new pull request, #37933: URL: https://github.com/apache/spark/pull/37933 ### What changes were proposed in this pull request? Adjust part of changes in https://github.com/apache/spark/pull/36871. In the pr above, we introduced the support of date type in CSV schema inference. The schema inference behavior on date time columns now is: - For a column only containing dates, we will infer it as Date type - For a column only containing timestamps, we will infer it as Timestamp type - For a column containing a mixture of dates and timestamps, we will infer it as Timestamp type However, we found that we are too ambitious on the last scenario, to support which we have introduced much complexity in code and caused a lot of performance concerns. Thus, we want to simplify the behavior of the last scenario as: - For a column containing a mixture of dates and timestamps, we will infer it as String type ### Why are the changes needed? Simplify CSV dateTime inference logic. ### Does this PR introduce _any_ user-facing change? No compared to the previous PR. ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on a diff in pull request #37634: [SPARK-40199][SQL] Provide useful error when projecting a non-null column encounters null value
xkrogen commented on code in PR #37634: URL: https://github.com/apache/spark/pull/37634#discussion_r974499166 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala: ## @@ -252,28 +267,44 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro """.stripMargin } + /** + * Wrap `inputExpr` in a try-catch block that will catch any [[NullPointerException]] that is + * thrown, instead throwing a (more helpful) error message as provided by + * [[org.apache.spark.sql.errors.QueryExecutionErrors.valueCannotBeNullError]]. + */ + private def wrapWithNpeHandling(inputExpr: String, descPath: Seq[String]): String = +s""" + |try { + | ${inputExpr.trim} Review Comment: I prefer exception-catching as it handles this issue with zero overhead. Adding a null-check here essentially falls back to the logic for a nullable schema: https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L119-L133 From the benchmark results, we can see that there is nontrivial overhead for the null-check; for the simple case of a projection of a primitive, the overhead is almost 50%: https://github.com/apache/spark/blob/2a1f9767213c321bd52e7714fa3b5bfc4973ba40/sql/catalyst/benchmarks/UnsafeProjectionBenchmark-jdk17-results.txt#L9-L10 You call out the situation of a null silently being replaced with a default value; this is a good point. I'm not sure how we can handle that without additional overhead of an explicit check. It seems that the default value replacement logic is coming from [Scala's own unboxing logic](https://github.com/scala/scala/blob/986dcc160aab85298f6cab0bf8dd0345497cdc01/src/library/scala/runtime/BoxesRunTime.java#L102). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xkrogen commented on pull request #37634: [SPARK-40199][SQL] Provide useful error when projecting a non-null column encounters null value
xkrogen commented on PR #37634: URL: https://github.com/apache/spark/pull/37634#issuecomment-1251319065 Thanks for the suggestion @cloud-fan ! Good point about there many places where Spark trusts nullability. Here I am trying to target places where _user code_ could introduce a null. This includes data sources (including in-memory dataset creation like `sparkContext.parallelize()` or `DatasetHolder.toDS()`) and UDFs (including lambdas in calls to DF/DS APIs). User code is inherently untrustworthy, so more checks are warranted. I think any of these places where user code supplies values would be covered by this PR since they all go through projection before being accessed by other codepaths, but LMK if you disagree. I guess one situation could be if the optimizer completely removes some operator because of the nullability, so the data is never even accessed--are you thinking about situations like this? Unfortunately we cannot use `AssertNotNull` for this; the optimizer will remove it if the input schema indicates that the schema is non-null: https://github.com/apache/spark/blob/96831bbb6749910c8f9497c048ba05e6e317649f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala#L826 And that optimizer rule is there for a good reason; there is nontrivial overhead associated with the null check as discussed further in [my other comment](https://github.com/apache/spark/pull/37634#discussion_r974499166). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yaohua628 commented on pull request #37905: [SPARK-40460][SS] Fix streaming metrics when selecting `_metadata`
Yaohua628 commented on PR #37905: URL: https://github.com/apache/spark/pull/37905#issuecomment-1251311583 > There's conflict in branch-3.3. @Yaohua628 Could you please craft a PR for branch-3.3? Thanks in advance! Done! https://github.com/apache/spark/pull/37932 - Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yaohua628 commented on pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
Yaohua628 commented on PR #37932: URL: https://github.com/apache/spark/pull/37932#issuecomment-1251310801 cc @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yaohua628 opened a new pull request, #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
Yaohua628 opened a new pull request, #37932: URL: https://github.com/apache/spark/pull/37932 ### What changes were proposed in this pull request? Cherry-picked from #37905 Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when selecting `_metadata` column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348) we cannot find the plan and collect metrics correctly. This PR fixes this by replacing the initial `LogicalPlan` with the `LogicalPlan` containing the metadata column ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing + New UTs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
dongjoon-hyun commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974491025 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -2159,6 +2171,16 @@ private[spark] class DAGScheduler( } } + private def isExecutorDecommissioned(bmAddress: BlockManagerId) = { Review Comment: If you don't mind, please add `taskScheduler` as a parameter here and use it inside function body. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
dongjoon-hyun commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974490653 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1860,8 +1863,17 @@ private[spark] class DAGScheduler( s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { failedStage.failedAttemptIds.add(task.stageAttemptId) + val ignoreStageFailure = ignoreDecommissionFetchFailure && +isExecutorDecommissioned(bmAddress) + if (ignoreStageFailure) { +logInfo("Ignoring fetch failure from $task of $failedStage attempt" + Review Comment: This log message might be misleading or wrong when `disallowStageRetryForTest` is true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor
dongjoon-hyun commented on code in PR #37924: URL: https://github.com/apache/spark/pull/37924#discussion_r974486551 ## docs/configuration.md: ## @@ -2605,6 +2605,15 @@ Apart from these, the following properties are also available, and may be useful 2.2.0 + + spark.stage.attempt.ignoreOnDecommissionFetchFailure + false + +Whether ignore stage fetch failure caused by executor decommission when Review Comment: Please remove the trailing space at the end of this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org