[GitHub] [spark] sadikovi commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference

2022-09-19 Thread GitBox


sadikovi commented on code in PR #37933:
URL: https://github.com/apache/spark/pull/37933#discussion_r974908544


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##
@@ -224,7 +223,7 @@ class UnivocityParser(
   case NonFatal(e) =>
 // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
 // compatibility if enabled.
-if (!enableParsingFallbackForTimestampType) {
+if (!enableParsingFallbackForDateType) {

Review Comment:
   @xiaonanyang-db Does it mean that we don't have test coverage for it? I am 
wondering how I managed to merge a PR like this without a unit test 樂. I will 
open a follow-up PR to improve testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xiaonanyang-db commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference

2022-09-19 Thread GitBox


xiaonanyang-db commented on code in PR #37933:
URL: https://github.com/apache/spark/pull/37933#discussion_r974909141


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##
@@ -224,7 +223,7 @@ class UnivocityParser(
   case NonFatal(e) =>
 // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
 // compatibility if enabled.
-if (!enableParsingFallbackForTimestampType) {
+if (!enableParsingFallbackForDateType) {

Review Comment:
   That will be nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference

2022-09-19 Thread GitBox


sadikovi commented on code in PR #37933:
URL: https://github.com/apache/spark/pull/37933#discussion_r974908544


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##
@@ -224,7 +223,7 @@ class UnivocityParser(
   case NonFatal(e) =>
 // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
 // compatibility if enabled.
-if (!enableParsingFallbackForTimestampType) {
+if (!enableParsingFallbackForDateType) {

Review Comment:
   @xiaonanyang-db Does it mean that we don't have test coverage for it? I am 
wondering how I managed to merge like this. I will open a follow-up PR to 
improve testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xiaonanyang-db commented on pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference

2022-09-19 Thread GitBox


xiaonanyang-db commented on PR #37933:
URL: https://github.com/apache/spark/pull/37933#issuecomment-1251869330

   > Can you update the description to list all of the semantics of the change? 
You can remove the point where we need to merge them to TimestampType if this 
is not what the PR implements and replace it with "merging to StringType" 
instead.
   > 
   > Is it correct that the date inference is still controlled by "prefersDate"?
   
   Sure!
   Yes, it's still controlled by "prefersDate".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #37940: [SPARK-40494][CORE][SQL][MLLIB] Optimize the performance of `keys.zipWithIndex.toMap` code pattern

2022-09-19 Thread GitBox


LuciferYang commented on PR #37940:
URL: https://github.com/apache/spark/pull/37940#issuecomment-1251869079

   Test the following code with input size 
`1,5,10,20,50,100,150,200,300,400,500,1000,5000,1,2`
   ```
 def testZipWithIndexToMap(valuesPerIteration: Int, collectionSize: Int): 
Unit = {
   
   val benchmark = new Benchmark(
 s"Test zip with index to map with collectionSize = $collectionSize",
 valuesPerIteration,
 output = output)
   
   val data = 0 until collectionSize
   
   benchmark.addCase("Use zipWithIndex + toMap") { _: Int =>
 for (_ <- 0L until valuesPerIteration) {
   val map: Map[Int, Int] = data.zipWithIndex.toMap
 }
   }
   
   benchmark.addCase("Use zipWithIndex + collection.breakOut") { _: Int =>
 for (_ <- 0L until valuesPerIteration) {
val map: Map[Int, Int] =
  data.zipWithIndex(collection.breakOut[IndexedSeq[Int], (Int, 
Int), Map[Int, Int]])
 }
   }
   
   benchmark.addCase("Use Manual builder") { _: Int =>
 for (_ <- 0L until valuesPerIteration) {
   val map: Map[Int, Int] = zipToMapUseMapBuilder[Int](data)
 }
   }
   
   benchmark.addCase("Use Manual map") { _: Int =>
 for (_ <- 0L until valuesPerIteration) {
   val map: Map[Int, Int] = zipWithIndexToMapUseMap[Int](data)
 }
   }
   benchmark.run()
 }
   
 private def zipToMapUseMapBuilder[K](keys: Iterable[K]): Map[K, Int] = {
   import scala.collection.immutable
   val builder = immutable.Map.newBuilder[K, Int]
   val keyIter = keys.iterator
   var idx = 0
   while (keyIter.hasNext) {
 builder += (keyIter.next(), idx).asInstanceOf[(K, Int)]
 idx = idx + 1
   }
   builder.result()
 }
   
 private def zipWithIndexToMapUseMap[K](keys: Iterable[K]): Map[K, Int] = {
   var elems: Map[K, Int] = Map.empty[K, Int]
   val keyIter = keys.iterator
   var idx = 0
   while (keyIter.hasNext) {
 elems += (keyIter.next().asInstanceOf[K] -> idx)
 idx = idx + 1
   }
   elems
 }
   ```
   
   result as follows:
   
   **Java 8**
   
   ```
   OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
   Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
   Test zip with index to map with collectionSize = 1:  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   
--
   Use zipWithIndex + toMap   41
 43   3  2.5 406.8   1.0X
   Use zipWithIndex + collection.breakOut  4
  4   0 23.6  42.4   9.6X
   Use Manual builder  4
  4   0 27.8  35.9  11.3X
   Use Manual map  3
  3   0 37.4  26.8  15.2X
   
   OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
   Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
   Test zip with index to map with collectionSize = 5:  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   
--
   Use zipWithIndex + toMap  142
143   2  0.71421.2   1.0X
   Use zipWithIndex + collection.breakOut101
102   1  1.01011.0   1.4X
   Use Manual builder 99
101   2  1.0 994.0   1.4X
   Use Manual map 49
 49   1  2.1 485.6   2.9X
   
   OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
   Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
   Test zip with index to map with collectionSize = 10:  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   
---
   Use zipWithIndex + toMap   166   
 170   5  0.61660.0   1.0X
   Use zipWithIndex + collection.breakOut 123   
 128   5  0.81226.3   1.4X
   Use Manual builder 121   
 123   3  0.81207.9   1.4X
   Use Manual map 102   
 104   3  

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974906420


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -311,6 +323,56 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+// applyInPandasWithState

Review Comment:
   NOTE: this is just a copy and paste of flatMapGroupsWithState since the 
characteristics are same for both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a diff in pull request #37933: [SPARK-40474][SQL] Infer columns with mixed date and timestamp as String in CSV schema inference

2022-09-19 Thread GitBox


sadikovi commented on code in PR #37933:
URL: https://github.com/apache/spark/pull/37933#discussion_r974905472


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##
@@ -224,7 +223,7 @@ class UnivocityParser(
   case NonFatal(e) =>
 // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
 // compatibility if enabled.
-if (!enableParsingFallbackForTimestampType) {
+if (!enableParsingFallbackForDateType) {

Review Comment:
   Thanks for fixing it, the values were swapped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974905452


##
python/pyspark/worker.py:
##
@@ -207,6 +209,65 @@ def wrapped(key_series, value_series):
 return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
 
 
+def wrap_grouped_map_pandas_udf_with_state(f, return_type):
+def wrapped(key_series, value_series_gen, state):
+import pandas as pd
+
+key = tuple(s[0] for s in key_series)
+
+if state.hasTimedOut:
+# Timeout processing pass empty iterator. Here we return an empty 
DataFrame instead.
+values = [
+pd.DataFrame(columns=pd.concat(next(value_series_gen), 
axis=1).columns),
+]
+else:
+values = (pd.concat(x, axis=1) for x in value_series_gen)
+
+result_iter = f(key, values, state)
+
+def verify_element(result):
+if not isinstance(result, pd.DataFrame):
+raise TypeError(
+"The type of element in return iterator of the 
user-defined function "
+"should be pandas.DataFrame, but is 
{}".format(type(result))
+)
+# the number of columns of result have to match the return type
+# but it is fine for result to have no columns at all if it is 
empty
+if not (

Review Comment:
   This is borrowed from above function - I think we took `if not` here because 
it's more intuitive and easier to think of "valid" case and apply "not" to 
reverse, rather than manually convert the conditions to be the contraposition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #37729: Revert "[SPARK-33861][SQL] Simplify conditional in predicate"

2022-09-19 Thread GitBox


dongjoon-hyun commented on PR #37729:
URL: https://github.com/apache/spark/pull/37729#issuecomment-1251865262

   Thank you, @wangyum .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #37940: [SPARK-40494][CORE][SQL][MLLIB] Optimize the performance of `keys.zipWithIndex.toMap` code pattern

2022-09-19 Thread GitBox


LuciferYang opened a new pull request, #37940:
URL: https://github.com/apache/spark/pull/37940

   ### What changes were proposed in this pull request?
   Similar as https://github.com/apache/spark/pull/37876,  this pr introduce a 
new toMap method to `o.a.spark.util.collection.Utils`, use `while loop manually 
style` to optimize the performance of `keys.zipWithIndex.toMap` code pattern in 
Spark.
   
   
   ### Why are the changes needed?
   Performance improvement
   
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Pass GitHub Actions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on pull request #37729: Revert "[SPARK-33861][SQL] Simplify conditional in predicate"

2022-09-19 Thread GitBox


wangyum commented on PR #37729:
URL: https://github.com/apache/spark/pull/37729#issuecomment-1251857340

   OK. https://issues.apache.org/jira/browse/SPARK-40493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #37937: [SPARK-40491][SQL] Expose a jdbcRDD function in SparkContext

2022-09-19 Thread GitBox


beliefer commented on PR #37937:
URL: https://github.com/apache/spark/pull/37937#issuecomment-1251853764

   > Hm, why do we need this? Can't we do `spark.read.jdbc(...).rdd` or `toDS`?
   
   I know. This PR just follows the legacy document of `JdbcRDD`. If we don't 
need the change, we may need update the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


dongjoon-hyun commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974893716


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2221,6 +2221,14 @@ package object config {
   .checkValue(_ >= 0, "needs to be a non-negative value")
   .createWithDefault(5)
 
+  private[spark] val STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE =
+ConfigBuilder("spark.stage.ignoreOnDecommissionFetchFailure")

Review Comment:
   Shall we remove the redundant `On`? It would be better because the variable 
name already have no `On`.
   ```
   - ignoreOnDecommissionFetchFailure
   + ignoreDecommissionFetchFailure
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


dongjoon-hyun commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974893716


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2221,6 +2221,14 @@ package object config {
   .checkValue(_ >= 0, "needs to be a non-negative value")
   .createWithDefault(5)
 
+  private[spark] val STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE =
+ConfigBuilder("spark.stage.ignoreOnDecommissionFetchFailure")

Review Comment:
   Shall we remove the redundant `On`?
   ```
   - ignoreOnDecommissionFetchFailure
   + ignoreDecommissionFetchFailure
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


dongjoon-hyun commented on code in PR #37934:
URL: https://github.com/apache/spark/pull/37934#discussion_r974891717


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:
##
@@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   vectorizedReader.initBatch(schema, partitionValues)
   vectorizedReader.nextKeyValue()
   val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
-
-  // Use `GenericMutableRow` by explicitly copying rather than 
`ColumnarBatch`
-  // in order to use get(...) method which is not implemented in 
`ColumnarBatch`.
-  val actual = row.copy().get(1, dt)

Review Comment:
   Well, if this is a specifically required change, please do this change in a 
separate PR, @kazuyukitanimura .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #37679: [SPARK-35242][SQL] Support changing session catalog's default database

2022-09-19 Thread GitBox


cloud-fan commented on code in PR #37679:
URL: https://github.com/apache/spark/pull/37679#discussion_r974888079


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##
@@ -48,9 +48,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
 import org.apache.spark.util.Utils
 
-object SessionCatalog {
-  val DEFAULT_DATABASE = "default"

Review Comment:
   can we still keep it so that we don't need to hardcode "default"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36096: [SPARK-38803][K8S][TESTS] Lower minio cpu to 250m (0.25) from 1 in K8s IT

2022-09-19 Thread GitBox


dongjoon-hyun commented on PR #36096:
URL: https://github.com/apache/spark/pull/36096#issuecomment-1251844564

   This test commit is backported to branch-3.3 according to the community 
request, https://github.com/apache/spark/pull/36087#issuecomment-1251757187 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`

2022-09-19 Thread GitBox


dongjoon-hyun commented on PR #36087:
URL: https://github.com/apache/spark/pull/36087#issuecomment-1251843627

   Sure, @Yikun .
   
   This test commit is backported to branch-3.3 according to the community 
request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunpe commented on pull request #33154: [SPARK-35949][CORE]Add `keep-spark-context-alive` arg for to prevent closing spark context after invoking main for some case

2022-09-19 Thread GitBox


sunpe commented on PR #33154:
URL: https://github.com/apache/spark/pull/33154#issuecomment-1251836832

   > Hello @sunpe, thank you for your very fast answer.
   > 
   > Please let me give you some more context, I am using Spark v3.3.0 in K8s 
using [Spark on K8S 
operator](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator). My 
Spark driver is a Spring Boot application, very similar to the situation you 
described above, but instead of starting a web server, I subscribe to a 
RabbitMQ queue. Basically, I would like to have the `SparkSession` available as 
`@Bean` in my services.
   > 
   > I tried the following:
   > 
   > 1. Activate `--verbose`
   > 2. Rebuild from source (v3.3.0), adding a simple `logWarning` before ` 
SparkContext.getActive.foreach(_.stop())`
   > 
   > Here are my findings:
   > 
   > Spark Operator pod submits the application using this command:
   > 
   > ```
   > submission.go:65] spark-submit arguments: [/opt/spark/bin/spark-submit
   > --class org.test.myapplication.MyApplication
   > --master k8s://https://10.32.0.1:443
   > --deploy-mode cluster
   > --conf spark.kubernetes.namespace=my-app-namespace
   > --conf spark.app.name=my-application
   > --conf spark.kubernetes.driver.pod.name=my-application-driver
   > --conf spark.kubernetes.container.image=repo/my-application:latest
   > --conf spark.kubernetes.container.image.pullPolicy=Always
   > --conf spark.kubernetes.submission.waitAppCompletion=false
   > --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=my-application
   > --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
   > --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=2b225916-cc12-47cc-a898-8549301fdce4
   > --conf spark.driver.cores=1
   > --conf spark.kubernetes.driver.request.cores=200m
   > --conf spark.driver.memory=512m
   > --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=spark-operator-spark
   > --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=my-application
   > --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true
   > --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=2b225916-cc12-47cc-a898-8549301fdce4
   > --conf spark.executor.instances=1
   > --conf spark.executor.cores=1
   > --conf spark.executor.memory=512m
   > --conf 
spark.kubernetes.executor.label.app.kubernetes.io/instance=my-app-namespace-my-application
   > local:///opt/spark/work-dir/my-application-0.0.1-SNAPSHOT-all.jar]
   > ```
   > 
   > When Driver pod starts, I have the following logs:
   > 
   > ```
   > (spark.driver.memory,512m)
   > (spark.driver.port,7078)
   > (spark.executor.cores,1)
   > (spark.executor.instances,1)
   > (spark.executor.memory,512m)
   > […]
   > (spark.kubernetes.resource.type,java)
   > (spark.kubernetes.submission.waitAppCompletion,false)
   > (spark.kubernetes.submitInDriver,true)
   > (spark.master,k8s://https://10.32.0.1:443)
   > ```
   > 
   > As you can see, `args.master` starts by `k8s`. Once the application is 
started and `main()` thread is release, my custom log is printed, 
`SparkContext` is being closed and the executor is stopped. As I understand in 
source code, my primary resource is not `spark-shell`, neither the main class 
is ` org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver` nor ` 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2`. It produces the 
following logs:
   > 
   > ```
   > 2022-09-19 13:05:18.621 INFO  30 --- [   main] 
o.s.a.r.c.CachingConnectionFactory   : Created new connection: 
rabbitConnectionFactory#1734b1a:0/SimpleConnection@66859ea9 
[delegate=amqp://user@100.64.2.141:5672/, localPort= 49038]
   > 2022-09-19 13:05:18.850 INFO  30 --- [   main] MyApplication : 
Started MyApplication in 18.787 seconds (JVM running for 23.051)
   > Warning: SparkContext is going to be stopped!
   > 2022-09-19 13:05:18.903 INFO  30 --- [   main] 
o.s.j.s.AbstractConnector: Stopped Spark@45d389f2{HTTP/1.1, 
(http/1.1)}{0.0.0.0:4040}
   > 2022-09-19 13:05:18.914 INFO  30 --- [   main] o.a.s.u.SparkUI 
 : Stopped Spark web UI at 
http://my-app-ee12f78355d97dc2-driver-svc.my-app-namespace.svc:4040
   > 2022-09-19 13:05:18.927 INFO  30 --- [   main] 
.s.c.k.KubernetesClusterSchedulerBackend : Shutting down all executors
   > 2022-09-19 13:05:18.928 INFO  30 --- [rainedScheduler] 
chedulerBackend$KubernetesDriverEndpoint : Asking each executor to shut down
   > 2022-09-19 13:05:18.938 DEBUG 30 --- [ rpc-server-4-1] 
o.a.s.n.u.NettyLogger: [id: 0x07e91ece, 
L:/100.64.15.15:7078 - R:/100.64.15.215:40996] WRITE: MessageWithHeader 
[headerLength: 13, bodyLength: 198]
   > 2022-09-19 13:05:18.939 DEBUG 30 --- [ rpc-server-4-1] 
o.a.s.n.u.NettyLogger: [id: 0x07e91ece, 
L:/100.64.15.15:7078 - R:/100.64.15.215:40996] 

[GitHub] [spark] HyukjinKwon opened a new pull request, #37939: [MINOR][DOCS][PYTHON] Document datetime.timedelta <> DayTimeIntervalType

2022-09-19 Thread GitBox


HyukjinKwon opened a new pull request, #37939:
URL: https://github.com/apache/spark/pull/37939

   ### What changes were proposed in this pull request?
   
   This PR proposes to document datetime.timedelta support in PySpark in SQL 
DataType reference page. This support was added in SPARK-37275
   
   ### Why are the changes needed?
   
   To show the support of datetime.timedelta.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, this fixes the documentation.
   
   ### How was this patch tested?
   
   CI in this PR should validate the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974870249


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,105 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked for each group 
repeatedly in every
+trigger, and updates to each group's state will be saved across 
invocations. The function
+will also be invoked for each timed-out state repeatedly. The sequence 
of the invocation
+will be input data -> state timeout. When the function is invoked for 
state timeout, there
+will be no data being presented.
+
+The function should takes parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple
+of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The 
state will be passed as
+:class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+For each group, all columns are passed together as `pandas.DataFrame` 
to the user-function,
+and the returned `pandas.DataFrame` across all invocations are 
combined as a
+:class:`DataFrame`. Note that the user function should loop through 
and process all
+elements in the iterator. The user function should not make a guess of 
the number of
+elements in the iterator.
+
+The `outputStructType` should be a :class:`StructType` describing the 
schema of all
+elements in returned value, `pandas.DataFrame`. The column labels of 
all elements in
+returned value, `pandas.DataFrame` must either match the field names 
in the defined
+schema if specified as strings, or match the field data types by 
position if not strings,
+e.g. integer indices.
+
+The `stateStructType` should be :class:`StructType` describing the 
schema of user-defined
+state. The value of state will be presented as a tuple, as well as the 
update should be
+performed with the tuple. User defined types e.g. native Python class 
types are not

Review Comment:
   I think we can just say that "the corresponding Python types for 
:class:`DataType` are supported".  Documented here 
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (click python tab)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r97486


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,

Review Comment:
   We can do both, setting to 4 if it's less than 4, with warning log that 
they're encouraged to set it higher.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata

2022-09-19 Thread GitBox


HyukjinKwon closed pull request #37932: [SPARK-40460][SS][3.3] Fix streaming 
metrics when selecting _metadata
URL: https://github.com/apache/spark/pull/37932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata

2022-09-19 Thread GitBox


HyukjinKwon commented on PR #37932:
URL: https://github.com/apache/spark/pull/37932#issuecomment-1251799957

   Merged to branch-3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974858058


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,

Review Comment:
   Never mind. I just talked with @HyukjinKwon and understood how buffer works 
(I misunderstood) - it's more about how many small Arrow RecordBatches to 
buffer and flush at once for efficiency. An Arrow RecordBatch bigger than 
buffer will be still considered as a single Arrow RecordBatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974858058


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,

Review Comment:
   Never mind. I just talked with @HyukjinKwon and understood how buffer works 
(I misunderstood) - it's more about how many Arrow RecordBatches to buffer and 
flush at once for efficiency. An Arrow RecordBatch bigger than buffer will be 
still considered as a single Arrow RecordBatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974856402


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.{FieldVector, VectorSchemaRoot}
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
+import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.execution.arrow.ArrowWriter.createFieldWriter
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.types.{BinaryType, BooleanType, IntegerType, 
StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * This class abstracts the complexity on constructing Arrow RecordBatches for 
data and state with
+ * bin-packing and chunking. The caller only need to call the proper public 
methods of this class
+ * `startNewGroup`, `writeRow`, `finalizeGroup`, `finalizeData` and this class 
will write the data
+ * and state into Arrow RecordBatches with performing bin-pack and chunk 
internally.
+ *
+ * This class requires that the parameter `root` has initialized with the 
Arrow schema like below:
+ * - data fields
+ * - state field
+ *   - nested schema (Refer ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA)
+ *
+ * Please refer the code comment in the implementation to see how the writes 
of data and state
+ * against Arrow RecordBatch work with consideration of bin-packing and 
chunking.
+ */
+class ApplyInPandasWithStateWriter(
+root: VectorSchemaRoot,
+writer: ArrowStreamWriter,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long) {
+
+  import ApplyInPandasWithStateWriter._
+
+  // We logically group the columns by family (data vs state) and initialize 
writer separately,
+  // since it's lot more easier and probably performant to write the row 
directly rather than
+  // projecting the row to match up with the overall schema.
+  //
+  // The number of data rows and state metadata rows can be different which 
could be problematic

Review Comment:
   We use a single Arrow RecordBatch for both data and state - I'll mention 
this and also the rationalization explicitly in the code comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974854298


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")

Review Comment:
   (closing the loop) We decided to simply use the number of rows for the 
condition of constructing Arrow RecordBatch. This will remove all new configs 
being introduced here, as well as reduce lots of complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios

2022-09-19 Thread GitBox


LuciferYang commented on code in PR #37938:
URL: https://github.com/apache/spark/pull/37938#discussion_r974854247


##
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##
@@ -237,6 +241,10 @@ protected void serviceInit(Configuration externalConf) 
throws Exception {
 
 boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, 
DEFAULT_STOP_ON_FAILURE);
 
+if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) 
{

Review Comment:
   Use Utils.isTesting directly need fix some other cases, for example:
   ```
   test("recovery db should not be created if NM recovery is not enabled") {
   s1 = new YarnShuffleService
   s1.init(yarnConfig)
   s1._recoveryPath should be (null)
   s1.registeredExecutorFile should be (null)
   s1.secretsFile should be (null)
 }
   ```
   Is it better to use a separate configuration as this pr?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`

2022-09-19 Thread GitBox


Yikun commented on PR #36087:
URL: https://github.com/apache/spark/pull/36087#issuecomment-1251792098

   Here is a simple demo to show why we need them: 
https://github.com/Yikun/spark-docker/pull/5
   
   - docker image build with tag v3.3.0
   - test with 3.3.0 K8S IT in github action
   - after test passed, we can publish 3.3.0 safely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios

2022-09-19 Thread GitBox


LuciferYang commented on code in PR #37938:
URL: https://github.com/apache/spark/pull/37938#discussion_r974846876


##
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##
@@ -237,6 +241,10 @@ protected void serviceInit(Configuration externalConf) 
throws Exception {
 
 boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, 
DEFAULT_STOP_ON_FAILURE);
 
+if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) 
{

Review Comment:
   It is not sure whether it is possible to use `Utils.isTesting` directly, but 
it requires a refactor work because `Utils.isTesting` cannot be used in this 
module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios

2022-09-19 Thread GitBox


LuciferYang commented on PR #37938:
URL: https://github.com/apache/spark/pull/37938#issuecomment-1251782282

   cc @tgravescs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #37938: [SPARK-40490][YARN][TESTS] Ensure `YarnShuffleIntegrationSuite` tests registeredExecFile reload scenarios

2022-09-19 Thread GitBox


LuciferYang opened a new pull request, #37938:
URL: https://github.com/apache/spark/pull/37938

   ### What changes were proposed in this pull request?
   After SPARK-17321, `YarnShuffleService` will persist data to local shuffle 
state db/reload data from local shuffle state db only when Yarn NodeManager 
start with `YarnConfiguration#NM_RECOVERY_ENABLED = true`.
   
   `YarnShuffleIntegrationSuite` not set 
`YarnConfiguration#NM_RECOVERY_ENABLED` and the default value of the 
configuration is false,  so `YarnShuffleIntegrationSuite` will neither trigger 
data persistence to the db nor verify the reload of data.
   
   This pr aims to let `YarnShuffleIntegrationSuite` restart the verification 
of registeredExecFile reload scenarios, to achieve this goal, this pr make the 
following changes:
   
   1. Add a new un-document configuration 
`spark.yarn.shuffle.integrationTesting` to `YarnShuffleService`, and Initialize 
`_recoveryPath` when `_recoveryPath == null && 
spark.yarn.shuffle.integrationTesting == true`.
   
   2. Only set `spark.yarn.shuffle.integrationTesting = true` in 
`YarnShuffleIntegrationSuite`, and add assertions to check `registeredExecFile` 
is not null to ensure that registeredExecFile reload scenarios will be verified.
   
   ### Why are the changes needed?
   Fix registeredExecFile reload  test scenarios.
   
   Why not test by configuring `YarnConfiguration#NM_RECOVERY_ENABLED` as true?
   
   This configuration has been tried
   
   **Hadoop 3.3.4**
   
   ```
   build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none 
-DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite
 -Phadoop-3
   ```
   
   ```
   2022-09-10T11:44:42.1710230Z Cause: java.lang.ClassNotFoundException: 
org.apache.hadoop.shaded.org.iq80.leveldb.DBException
   2022-09-10T11:44:42.1715234Z at 
java.net.URLClassLoader.findClass(URLClassLoader.java:387)
   2022-09-10T11:44:42.1719347Z at 
java.lang.ClassLoader.loadClass(ClassLoader.java:419)
   2022-09-10T11:44:42.1723090Z at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
   2022-09-10T11:44:42.1726759Z at 
java.lang.ClassLoader.loadClass(ClassLoader.java:352)
   2022-09-10T11:44:42.1731028Z at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313)
   2022-09-10T11:44:42.1735424Z at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370)
   2022-09-10T11:44:42.1740303Z at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
   2022-09-10T11:44:42.1745576Z at 
org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597)
   2022-09-10T11:44:42.1828858Z at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
   2022-09-10T11:44:42.1829712Z at 
org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109)
   2022-09-10T11:44:42.1830633Z at 
org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327)
   2022-09-10T11:44:42.1831431Z at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
   2022-09-10T11:44:42.1832279Z at 
org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:112)
   ```
   
   **Hadoop 2.7.4**
   
   ```
   build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none 
-DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite
 -Phadoop-2
   ```
   
   ```
   YarnShuffleIntegrationWithLevelDBBackendSuite:
   org.apache.spark.deploy.yarn.YarnShuffleIntegrationWithLevelDBBackendSuite 
*** ABORTED ***
 java.lang.IllegalArgumentException: Cannot support recovery with an 
ephemeral server port. Check the setting of yarn.nodemanager.address
 at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395)
 at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
 at 
org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
 at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
 at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
 at 
org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560)
 at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
 at 
org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
 at 
org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278)
 at 
org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
 ...
   Run completed in 3 seconds, 992 milliseconds.
   Total number of tests run: 0
   Suites: completed 1, aborted 1
   Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
   *** 1 SUITE ABORTED ***
 

[GitHub] [spark] beliefer opened a new pull request, #37937: [SPARK-40491][SQL] Expose a jdbcRDD function in SparkContext

2022-09-19 Thread GitBox


beliefer opened a new pull request, #37937:
URL: https://github.com/apache/spark/pull/37937

   ### What changes were proposed in this pull request?
   According to the legacy document of `JdbcRDD`, we need to expose a jdbcRDD 
function in `SparkContext`.
   
   
   ### Why are the changes needed?
   The `jdbcRDD` function in `SparkContext` easy use for users.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   Just is a new API.
   
   
   ### How was this patch tested?
   New tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974707164


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,
+"Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+  s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  // applyInPandasWithState has its own mechanism to construct the Arrow 
RecordBatch instance.
+  // Configurations are both applied to executor and Python worker, set them 
to the worker conf
+  // to let Python worker read the config properly.
+  override protected val workerConf: Map[String, String] = initialWorkerConf +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH.key ->
+  softLimitBytesPerBatch.toString) +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE.key ->
+  minDataCountForSample.toString) +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH.key ->
+  softTimeoutMillsPurgeBatch.toString)
+
+  private val stateRowDeserializer = stateEncoder.createDeserializer()
+
+  override protected def handleMetadataBeforeExec(stream: DataOutputStream): 
Unit = {
+super.handleMetadataBeforeExec(stream)
+// Also write the schema for state value
+PythonRDD.writeUTF(stateValueSchema.json, stream)
+  }
+
+  protected def writeIteratorToArrowStream(
+  root: VectorSchemaRoot,
+  writer: ArrowStreamWriter,
+  dataOut: DataOutputStream,
+  inputIterator: Iterator[InType]): Unit = {
+val w = new ApplyInPandasWithStateWriter(root, writer, 
softLimitBytesPerBatch,
+  minDataCountForSample, softTimeoutMillsPurgeBatch)
+
+while (inputIterator.hasNext) {
+  val (keyRow, groupState, dataIter) = inputIterator.next()
+  assert(dataIter.hasNext, "should have at least one data row!")
+  w.startNewGroup(keyRow, groupState)
+
+  while (dataIter.hasNext) {
+val dataRow = dataIter.next()
+w.writeRow(dataRow)

[GitHub] [spark] zhengruifeng commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

2022-09-19 Thread GitBox


zhengruifeng commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974838677


##
mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala:
##
@@ -496,18 +499,23 @@ class ALSModel private[ml] (
   .iterator.map { j => (srcId, dstIds(j), scores(j)) }
   }
 }
-  }
-// We'll force the IDs to be Int. Unfortunately this converts IDs to Int 
in the output.
-val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, 
Ordering.by(_._2))
-val recs = ratings.as[(Int, Int, 
Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
-  .toDF("id", "recommendations")
+  }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn)
+
+val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, 
num, true)
+  .toAggregateExpression(false)

Review Comment:
   sure, I think we don't want to expose it, so mark it `private[spark]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #37840: [SPARK-40416][SQL] Move subquery expression CheckAnalysis error messages to use the new error framework

2022-09-19 Thread GitBox


gengliangwang commented on code in PR #37840:
URL: https://github.com/apache/spark/pull/37840#discussion_r974833681


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -3911,6 +3911,15 @@ object SQLConf {
 .checkValues(ErrorMessageFormat.values.map(_.toString))
 .createWithDefault(ErrorMessageFormat.PRETTY.toString)
 
+  val INCLUDE_PLANS_IN_ERRORS = buildConf("spark.sql.error.includePlans")

Review Comment:
   @dtenedor let's revert the changes in SQLConf.scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #37926: [SPARK-40484][BUILD] Upgrade log4j2 to 2.19.0

2022-09-19 Thread GitBox


LuciferYang commented on PR #37926:
URL: https://github.com/apache/spark/pull/37926#issuecomment-1251761360

   thanks @viirya @srowen @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`

2022-09-19 Thread GitBox


Yikun commented on PR #36087:
URL: https://github.com/apache/spark/pull/36087#issuecomment-1251757187

   And also this https://github.com/apache/spark/pull/36096


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on pull request #36087: [SPARK-38802][K8S][TESTS] Add Support for `spark.kubernetes.test.(driver|executor)RequestCores`

2022-09-19 Thread GitBox


Yikun commented on PR #36087:
URL: https://github.com/apache/spark/pull/36087#issuecomment-1251756266

   @dongjoon-hyun Could we backport this to branch-3.3, this will very help to 
run branch-3.3 K8S in github action.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #37929: [SPARK-40486][PS] Implement `spearman` and `kendall` in `DataFrame.corrwith`

2022-09-19 Thread GitBox


zhengruifeng commented on PR #37929:
URL: https://github.com/apache/spark/pull/37929#issuecomment-1251750796

   cc @itholic  @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`

2022-09-19 Thread GitBox


Yikun commented on code in PR #37923:
URL: https://github.com/apache/spark/pull/37923#discussion_r974808692


##
python/pyspark/pandas/groupby.py:
##
@@ -993,6 +994,101 @@ def nth(self, n: int) -> FrameLike:
 
 return self._prepare_return(DataFrame(internal))
 
+def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0):

Review Comment:
   ```suggestion
   def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0) 
-> FrameLike:
   ```



##
python/pyspark/pandas/groupby.py:
##
@@ -18,7 +18,6 @@
 """
 A wrapper for GroupedData to behave similar to pandas GroupBy.
 """
-

Review Comment:
   unrelated change



##
python/pyspark/pandas/groupby.py:
##
@@ -993,6 +994,101 @@ def nth(self, n: int) -> FrameLike:
 
 return self._prepare_return(DataFrame(internal))
 
+def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0):
+"""
+Compute prod of groups.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+numeric_only : bool, default False
+Include only float, int, boolean columns. If None, will attempt to 
use
+everything, then use only numeric data.
+
+min_count: int, default 0
+The required number of valid values to perform the operation.
+If fewer than min_count non-NA values are present the result will 
be NA.
+
+Returns
+---
+pyspark.pandas.Series or pyspark.pandas.DataFrame
+
+See Also
+
+pyspark.pandas.Series.groupby
+pyspark.pandas.DataFrame.groupby
+
+Examples
+
+>>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2],
+...'B': [np.nan, 2, 3, 4, 5],
+...'C': [1, 2, 1, 1, 2],
+...'D': [True, False, True, False, True]})
+
+Groupby one column and return the prod of the remaining columns in
+each group.
+
+>>> df.groupby('A').prod().sort_index()
+ B  C  D
+A
+1  8.0  2  0
+2  15.0 2  1
+
+>>> df.groupby('A').prod(min_count=3).sort_index()
+ B  C   D
+A
+1  NaN  2.0  0.0
+2  NaN NaN  NaN
+"""
+
+self._validate_agg_columns(numeric_only=numeric_only, 
function_name="prod")
+
+groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in 
range(len(self._groupkeys))]
+internal, agg_columns, sdf = self._prepare_reduce(
+groupkey_names=groupkey_names,
+accepted_spark_types=(NumericType, BooleanType),
+bool_to_numeric=True,
+)
+
+psdf: DataFrame = DataFrame(internal)
+if len(psdf._internal.column_labels) > 0:
+tmp_count_column = "__tmp_%s_count_col__"

Review Comment:
   ```suggestion
   tmp_count_column = verify_temp_column_name(psdf, 
"__tmp_%s_count_col__")
   ```
   
   You might want to verify column to aovid pontential column name conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974805894


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,
+"Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+  s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  // applyInPandasWithState has its own mechanism to construct the Arrow 
RecordBatch instance.
+  // Configurations are both applied to executor and Python worker, set them 
to the worker conf
+  // to let Python worker read the config properly.
+  override protected val workerConf: Map[String, String] = initialWorkerConf +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH.key ->
+  softLimitBytesPerBatch.toString) +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE.key ->
+  minDataCountForSample.toString) +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH.key ->
+  softTimeoutMillsPurgeBatch.toString)
+
+  private val stateRowDeserializer = stateEncoder.createDeserializer()
+
+  override protected def handleMetadataBeforeExec(stream: DataOutputStream): 
Unit = {
+super.handleMetadataBeforeExec(stream)
+// Also write the schema for state value
+PythonRDD.writeUTF(stateValueSchema.json, stream)
+  }
+
+  protected def writeIteratorToArrowStream(

Review Comment:
   I agree the order of parameters didn't strictly follow some well-known best 
practice, but the change requires to change the base class instead of this. May 
need a follow-up JIRA ticket / PR to address this in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974803979


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,

Review Comment:
   This is borrowed from ArrowPythonRunner. Btw, I realized we should not allow 
Arrow RecordBatch in this runner to be split down due to buffer size - this 
runner has to have a full control of Arrow RecordBatch. We'll have to set this 
be constant something like Long.MAX_VALUE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974798726


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],

Review Comment:
   We can't use `workerConf` here since `override protected val workerConf`. So 
it's something like `_workerConf` vs `initialWorkerConf` and then it doesn't 
sound too bad to have prefix rather than `_`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] warrenzhu25 commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


warrenzhu25 commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974793372


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1860,8 +1867,18 @@ private[spark] class DAGScheduler(
 s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
 } else {
   failedStage.failedAttemptIds.add(task.stageAttemptId)
+  val ignoreStageFailure = ignoreDecommissionFetchFailure &&
+isExecutorDecommissioned(taskScheduler, bmAddress)
+  if (ignoreStageFailure) {
+logInfo("Ignoring fetch failure from $task of $failedStage attempt 
" +
+  s"${task.stageAttemptId} when count 
spark.stage.maxConsecutiveAttempts " +
+  "as executor ${bmAddress.executorId} is decommissioned and " +
+  s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true")
+  }
+
   val shouldAbortStage =
-failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
+(!ignoreStageFailure &&
+  failedStage.failedAttemptIds.size >= 
maxConsecutiveStageAttempts) ||
 disallowStageRetryForTest

Review Comment:
   Good catch. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HeartSaVioR commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974784396


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -371,3 +375,292 @@ def load_stream(self, stream):
 raise ValueError(
 "Invalid number of pandas.DataFrames in group 
{0}".format(dataframes_in_group)
 )
+
+
+class ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


viirya commented on code in PR #37934:
URL: https://github.com/apache/spark/pull/37934#discussion_r974783229


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:
##
@@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   vectorizedReader.initBatch(schema, partitionValues)
   vectorizedReader.nextKeyValue()
   val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
-
-  // Use `GenericMutableRow` by explicitly copying rather than 
`ColumnarBatch`
-  // in order to use get(...) method which is not implemented in 
`ColumnarBatch`.

Review Comment:
   But when used in partition column, `ColumnarBatchRow` is used instead of 
`GenericMutableRow` (i.e. `copy` won't be called). Is it correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`

2022-09-19 Thread GitBox


zhengruifeng commented on PR #37923:
URL: https://github.com/apache/spark/pull/37923#issuecomment-1251698604

   ```
   Oh no!   
   2 files would be reformatted, 352 files would be left unchanged.
   Please run 'dev/reformat-python' script.
   1
   Error: Process completed with exit code 1.
   ```
   
   `lint-python` failed, you would install related packages in your env via 
`pip install -r dev/requirements.txt `, and then reformat the python files via 
`dev/reformat-python` and check the lint locally via `dev/lint-python`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`

2022-09-19 Thread GitBox


zhengruifeng commented on code in PR #37923:
URL: https://github.com/apache/spark/pull/37923#discussion_r974780246


##
python/pyspark/pandas/groupby.py:
##
@@ -993,6 +994,101 @@ def nth(self, n: int) -> FrameLike:
 
 return self._prepare_return(DataFrame(internal))
 
+def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0):
+"""
+Compute prod of groups.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+numeric_only : bool, default False
+Include only float, int, boolean columns. If None, will attempt to 
use
+everything, then use only numeric data.
+
+min_count: int, default 0
+The required number of valid values to perform the operation.
+If fewer than min_count non-NA values are present the result will 
be NA.
+
+Returns
+---
+pyspark.pandas.Series or pyspark.pandas.DataFrame

Review Comment:
   ```suggestion
   Series or DataFrame
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`

2022-09-19 Thread GitBox


zhengruifeng commented on code in PR #37923:
URL: https://github.com/apache/spark/pull/37923#discussion_r974779483


##
python/pyspark/pandas/groupby.py:
##
@@ -44,6 +43,7 @@
 )
 import warnings
 
+import numpy as np

Review Comment:
   `numpy` in the docstring was imported in `def _test()`, so no need to import 
it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] chaoqin-li1123 commented on pull request #37935: Do maintenance before streaming StateStore unload

2022-09-19 Thread GitBox


chaoqin-li1123 commented on PR #37935:
URL: https://github.com/apache/spark/pull/37935#issuecomment-1251692899

   @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] kazuyukitanimura commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


kazuyukitanimura commented on code in PR #37934:
URL: https://github.com/apache/spark/pull/37934#discussion_r974775400


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:
##
@@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   vectorizedReader.initBatch(schema, partitionValues)
   vectorizedReader.nextKeyValue()
   val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
-
-  // Use `GenericMutableRow` by explicitly copying rather than 
`ColumnarBatch`
-  // in order to use get(...) method which is not implemented in 
`ColumnarBatch`.
-  val actual = row.copy().get(1, dt)

Review Comment:
   That's correct. The `copy()` converts `row` from `ColumnarBatchRow` to 
`GenericInternalRow` and there null works. 
   With this PR, `null` is directly supported in `ColumnarBatchRow.get()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] kazuyukitanimura commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


kazuyukitanimura commented on code in PR #37934:
URL: https://github.com/apache/spark/pull/37934#discussion_r974772538


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:
##
@@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   vectorizedReader.initBatch(schema, partitionValues)
   vectorizedReader.nextKeyValue()
   val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
-
-  // Use `GenericMutableRow` by explicitly copying rather than 
`ColumnarBatch`
-  // in order to use get(...) method which is not implemented in 
`ColumnarBatch`.

Review Comment:
   I think so, overtime `get` is implemented. cc original author of this test 
@HyukjinKwon  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR closed pull request #37917: [SPARK-40466][SS] Improve the error message when DSv2 is disabled while DSv1 is not avaliable

2022-09-19 Thread GitBox


HeartSaVioR closed pull request #37917: [SPARK-40466][SS] Improve the error 
message when DSv2 is disabled while DSv1 is not avaliable
URL: https://github.com/apache/spark/pull/37917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #37917: [SPARK-40466][SS] Improve the error message when DSv2 is disabled while DSv1 is not avaliable

2022-09-19 Thread GitBox


HeartSaVioR commented on PR #37917:
URL: https://github.com/apache/spark/pull/37917#issuecomment-1251667880

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL opened a new pull request, #37936: Add additional tests to StreamingSessionWindowSuite

2022-09-19 Thread GitBox


WweiL opened a new pull request, #37936:
URL: https://github.com/apache/spark/pull/37936

   
   
   ## What changes were proposed in this pull request?
   Add complex tests to `StreamingSessionWindowSuite`. Concretely, I created 
two helper functions, 
   - one is called `sessionWindowQueryNestedKey`, which would convert 
`sessionId` from the single word key used in `sessionWindowQuery` to a nested 
column key. For example: `"hello" -> (("hello", "hello"), "hello")`.
   - The other is called `sessionWindowQueryMultiColKey`. It would convert 
`sessionId` from the single word key used in `sessionWindowQuery` to two 
columns. For example: "hello" -> col1: ("hello", "hello"), col2: "hello"
   
   With the two new helper functions, I added more tests for the tests for 
`complete mode` and `append mode`, also for `cap gap duration` and `async 
state`. The logic of the tests are not changed at all, just the key.
   
   For the aggregation test (`session window - with more aggregation 
functions`), I added some more functions to test, and I tried `first()` and 
`last()` function on a nested triple, created using the same method as the 
above. 
   
   
   
   ## How was this patch tested?
   All are tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-19 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r974747883


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |

Review Comment:
   Discussed with @leewyang offline. We will drop this scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #37729: Revert "[SPARK-33861][SQL] Simplify conditional in predicate"

2022-09-19 Thread GitBox


dongjoon-hyun commented on PR #37729:
URL: https://github.com/apache/spark/pull/37729#issuecomment-1251641154

   Sorry, but I missed that this is an ancient patch. To @wangyum , we need a 
new JIRA when we revert already released patches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


viirya commented on code in PR #37934:
URL: https://github.com/apache/spark/pull/37934#discussion_r974736080


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:
##
@@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   vectorizedReader.initBatch(schema, partitionValues)
   vectorizedReader.nextKeyValue()
   val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
-
-  // Use `GenericMutableRow` by explicitly copying rather than 
`ColumnarBatch`
-  // in order to use get(...) method which is not implemented in 
`ColumnarBatch`.
-  val actual = row.copy().get(1, dt)

Review Comment:
   I can run this test with `null` without this change (L1467).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a diff in pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


viirya commented on code in PR #37934:
URL: https://github.com/apache/spark/pull/37934#discussion_r974734194


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:
##
@@ -1461,10 +1462,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   vectorizedReader.initBatch(schema, partitionValues)
   vectorizedReader.nextKeyValue()
   val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
-
-  // Use `GenericMutableRow` by explicitly copying rather than 
`ColumnarBatch`
-  // in order to use get(...) method which is not implemented in 
`ColumnarBatch`.

Review Comment:
   Hmm, why it said `get` is not implemented in `ColumnarBatch`? Is this just 
out-dated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] chaoqin-li1123 opened a new pull request, #37935: Do maintenance before streaming StateStore unload

2022-09-19 Thread GitBox


chaoqin-li1123 opened a new pull request, #37935:
URL: https://github.com/apache/spark/pull/37935

   
   
   ### What changes were proposed in this pull request?
   Before unload of a StateStore, perform a cleanup.
   
   
   
   ### Why are the changes needed?
   Current the maintenance of StateStore is performed by a periodic task in the 
management thread. If a streaming query become inactive before the next 
maintenance task fire, its StateStore will be unloaded before cleanup. 
   There are 2 cases when a StateStore is unloaded.
   1. StateStoreProvider is not longer active in the system, for example, when 
a query ends or the spark context terminates.
   2. There is other active StateStoreProvider in the system, for example, when 
a partition is reassigned.
   
   In case 1, we should do one last maintenance before unloading the instance,
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   Shutdown delay of a query may increase because a maintenance task is 
scheduled.
   
   
   
   ### How was this patch tested?
   Add an integration test that verify that redundant delta file is deleted 
when StateStore instances is deactivated and unloaded. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wbo4958 commented on pull request #37855: [SPARK-40407][SQL] Fix the potential data skew caused by df.repartition

2022-09-19 Thread GitBox


wbo4958 commented on PR #37855:
URL: https://github.com/apache/spark/pull/37855#issuecomment-1251592287

   > 
   
   
   
   > @wbo4958
   > 
   > Issue: The xgboost code uses rdd barrier mode, but barrier mode does not 
work with `coalesce` operator.
   
   @mridulm just suggested using another random way borrowing from RDD.coalesce 
to get the starting position, it will not introduce any coalesce into 
repartition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #37424: [SPARK-39991][SQL][AQE] Use available column statistics from completed query stages

2022-09-19 Thread GitBox


dongjoon-hyun closed pull request #37424: [SPARK-39991][SQL][AQE] Use available 
column statistics from completed query stages
URL: https://github.com/apache/spark/pull/37424


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #37922: [WIP][SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-09-19 Thread GitBox


AmplabJenkins commented on PR #37922:
URL: https://github.com/apache/spark/pull/37922#issuecomment-1251551259

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`

2022-09-19 Thread GitBox


AmplabJenkins commented on PR #37923:
URL: https://github.com/apache/spark/pull/37923#issuecomment-1251551212

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


AmplabJenkins commented on PR #37924:
URL: https://github.com/apache/spark/pull/37924#issuecomment-1251551174

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974680745


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,

Review Comment:
   why do we care to throw this exception. may just ensure that the buffer size 
if bigger:
   
   override val bufferSize: Int = max(4, SQLConf.get.pandasUDFBufferSize)



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974672806


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =

Review Comment:
   I wonder if we really care to have this param. Ultimately if the sizing 
estimate works badly, the users can just set a lower value for the batch size 
limit. I do not think it is useful to let them tune this parameter.



##
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##
@@ -620,6 +622,35 @@ class RelationalGroupedDataset protected[sql](
 Dataset.ofRows(df.sparkSession, plan)
   }
 
+  private[sql] def applyInPandasWithState(

Review Comment:
   method level comment



##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")

Review Comment:
   I agree that expressing the limit in terms of bytes is more meaningful that 
records. However we estimate the bytes size efficiently. Specifically here I 
would rename 'softLimitSizePerBatch' by removing 'soft' - we can clarify in the 
comment about that and also including 'Bytes' - 'batchSizeLimitBytes' . Also 
wonder if we should have the property specific to applyInPandasWithState or 
should we make it general - remove the applyInPandasWithState scoping even if 
we do not support this limit initially, seems like generally meaningful and we 
can follow up fixing the other places as a bug.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =
+
buildConf("spark.sql.execution.applyInPandasWithState.minDataCountForSample")
+  .internal()
+  .doc("When using applyInPandasWithState, specify the minimum number of 
records to sample " +
+"the size of record. The size being retrieved from sampling will be 
used to estimate " +
+"the accumulated size of records. Note that limiting by size does not 

[GitHub] [spark] MaxGekk commented on pull request #37921: [SPARK-40479][SQL] Migrate unexpected input type error to an error class

2022-09-19 Thread GitBox


MaxGekk commented on PR #37921:
URL: https://github.com/apache/spark/pull/37921#issuecomment-1251528429

   @srielau @anchovYu Could you take a look at the PR, please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-19 Thread GitBox


leewyang commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r974649030


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],

Review Comment:
   - mutable `[]` is fixed.
   - for now, the `input_names` is an explicit signal to the code that the user 
is expecting multiple named inputs, and for most simple (single-input) cases, 
this wouldn't be used/required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-19 Thread GitBox


leewyang commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r974646917


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |

Review Comment:
   I think it's fine to leave this for now... Otherwise, we would just add an 
error response in this case to tell folks to use `array`.  So it seems it'd be 
better to just support it vs. raising an error (unless this causes issues 
elsewhere).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on pull request #37908: [SPARK-40196][PS][FOLLOWUP] `SF.lit` -> `F.lit` in `window.quantile`

2022-09-19 Thread GitBox


xinrong-meng commented on PR #37908:
URL: https://github.com/apache/spark/pull/37908#issuecomment-1251489189

   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] leewyang commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-19 Thread GitBox


leewyang commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r974626628


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.

Review Comment:
   Updated the code to support `col` and `str` column selections, so `struct` 
is not a requirement any more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] kazuyukitanimura commented on pull request #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


kazuyukitanimura commented on PR #37934:
URL: https://github.com/apache/spark/pull/37934#issuecomment-1251489126

   cc @sunchao @viirya @flyrain


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on pull request #37912: [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat

2022-09-19 Thread GitBox


xinrong-meng commented on PR #37912:
URL: https://github.com/apache/spark/pull/37912#issuecomment-1251488032

   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on pull request #37888: [SPARK-40196][PYTHON][PS] Consolidate `lit` function with NumPy scalar in sql and pandas module

2022-09-19 Thread GitBox


xinrong-meng commented on PR #37888:
URL: https://github.com/apache/spark/pull/37888#issuecomment-1251487641

   Thank you @HyukjinKwon @zhengruifeng @Yikun for taking care of the merging!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] kazuyukitanimura opened a new pull request, #37934: [SPARK-40477][SQL] Support `NullType` in `ColumnarBatchRow`

2022-09-19 Thread GitBox


kazuyukitanimura opened a new pull request, #37934:
URL: https://github.com/apache/spark/pull/37934

   ### What changes were proposed in this pull request?
   This PR proposes to support `NullType` in `ColumnarBatchRow`.
   
   
   ### Why are the changes needed?
   `ColumnarBatchRow.get()` does not support `NullType` currently.
   By supporting `NullType` in `ColumnarBatchRow`, `NullType` can be used for 
partition columns for example.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Test added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-19 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r974611991


##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar

Review Comment:
   Basically, if I run
   
   ```
   import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
   ```
   
   through
   
   ```
   ./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false 
-Dscalafmt.onlyChanged=false -pl connect
   ```
   
   it becomes this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


mridulm commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974600104


##
docs/configuration.md:
##
@@ -2605,6 +2605,15 @@ Apart from these, the following properties are also 
available, and may be useful
   
   2.2.0
 
+
+  spark.stage.attempt.ignoreOnDecommissionFetchFailure

Review Comment:
   `spark.stage.attempt.ignoreOnDecommissionFetchFailure` -> 
`spark.stage.ignoreOnDecommissionFetchFailure`



##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1860,8 +1867,18 @@ private[spark] class DAGScheduler(
 s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
 } else {
   failedStage.failedAttemptIds.add(task.stageAttemptId)
+  val ignoreStageFailure = ignoreDecommissionFetchFailure &&
+isExecutorDecommissioned(taskScheduler, bmAddress)
+  if (ignoreStageFailure) {
+logInfo("Ignoring fetch failure from $task of $failedStage attempt 
" +
+  s"${task.stageAttemptId} when count 
spark.stage.maxConsecutiveAttempts " +
+  "as executor ${bmAddress.executorId} is decommissioned and " +
+  s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true")
+  }
+
   val shouldAbortStage =
-failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
+(!ignoreStageFailure &&
+  failedStage.failedAttemptIds.size >= 
maxConsecutiveStageAttempts) ||
 disallowStageRetryForTest

Review Comment:
   QQ: We are preventing the immediate failure from aborting the stage, but 
might be effectively reducing the number of stage failures which can be 
tolerated ?
   
   For example:
   attempt 0, attempt 1, attempt 2 failed due to decommission
   attempt 3 failed for other reasons -> job failed (assuming 
maxConsecutiveStageAttempts = 4)
   
   Is this the behavior we will now exhibit ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-19 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r974602945


##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/planner/SparkConnectPlanner.scala:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, plans}
+import org.apache.spark.sql.catalyst.analysis.{
+  UnresolvedAlias,
+  UnresolvedAttribute,
+  UnresolvedFunction,
+  UnresolvedRelation,
+  UnresolvedStar

Review Comment:
   I'm running this through `scalafmt` and checkstyle and it doesn't complain. 
What's the best way to solve this automatically?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on a diff in pull request #37710: [SPARK-40448][CONNECT] Spark Connect build as Driver Plugin with Shaded Dependencies

2022-09-19 Thread GitBox


grundprinzip commented on code in PR #37710:
URL: https://github.com/apache/spark/pull/37710#discussion_r974599749


##
connect/src/main/scala/org/apache/spark/sql/sparkconnect/command/SparkConnectCommandPlanner.scala:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sparkconnect.command

Review Comment:
   I'll change to connect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #37928: [SPARK-40485][SQL] Extend the partitioning options of the JDBC data source

2022-09-19 Thread GitBox


AmplabJenkins commented on PR #37928:
URL: https://github.com/apache/spark/pull/37928#issuecomment-1251440593

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shrprasa commented on pull request #37880: [SPARK-39399] [CORE] [K8S]: Fix proxy-user authentication for Spark on k8s in cluster deploy mode

2022-09-19 Thread GitBox


shrprasa commented on PR #37880:
URL: https://github.com/apache/spark/pull/37880#issuecomment-1251427562

   @gaborgsomogyi  @dongjoon-hyun @HyukjinKwon Can you please review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dtenedor commented on a diff in pull request #37840: [SPARK-40416][SQL] Move subquery expression CheckAnalysis error messages to use the new error framework

2022-09-19 Thread GitBox


dtenedor commented on code in PR #37840:
URL: https://github.com/apache/spark/pull/37840#discussion_r974575955


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##
@@ -730,6 +729,13 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 }
   }
 
+  private def canonicalizeForError(expr: LogicalPlan): String =
+if (SQLConf.get.getConf(SQLConf.CANONICALIZE_PLANS_IN_ERRORS)) {

Review Comment:
   @gengliangwang thanks for the suggestion! This is done, it is simpler now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974517188


##
python/pyspark/worker.py:
##
@@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type):
 
 if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
 ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name)
+elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
+soft_limit_bytes_per_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
+(64 * 1024 * 1024),
+)
+soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch)
+
+min_data_count_for_sample = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100

Review Comment:
   similar comment about the property names and default values here and 
everywhere else - can they be defined in a more prominent place 



##
python/pyspark/worker.py:
##
@@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type):
 
 if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
 ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name)
+elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
+soft_limit_bytes_per_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
+(64 * 1024 * 1024),

Review Comment:
   can the default be value be defined in some more prominent place? Also the 
property names.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -311,6 +323,56 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+// applyInPandasWithState
+case m: FlatMapGroupsInPandasWithState if m.isStreaming =>
+  // Check compatibility with output modes and aggregations in query
+  val aggsInQuery = collectStreamingAggregates(plan)
+
+  if (aggsInQuery.isEmpty) {
+// applyInPandasWithState without aggregation: operation's output 
mode must

Review Comment:
   why do we even have operation output mode. We are defining a new api, can we 
just drop this parameter from the api if we are going to be enforcing for it t 
match the output mode?



##
python/pyspark/worker.py:
##
@@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type):
 
 if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
 ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name)
+elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
+soft_limit_bytes_per_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
+(64 * 1024 * 1024),
+)
+soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch)
+
+min_data_count_for_sample = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100
+)
+min_data_count_for_sample = int(min_data_count_for_sample)
+
+soft_timeout_millis_purge_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softTimeoutPurgeBatch", 100

Review Comment:
   same



##
python/pyspark/worker.py:
##
@@ -207,6 +209,65 @@ def wrapped(key_series, value_series):
 return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
 
 
+def wrap_grouped_map_pandas_udf_with_state(f, return_type):
+def wrapped(key_series, value_series_gen, state):
+import pandas as pd
+
+key = tuple(s[0] for s in key_series)
+
+if state.hasTimedOut:
+# Timeout processing pass empty iterator. Here we return an empty 
DataFrame instead.
+values = [
+pd.DataFrame(columns=pd.concat(next(value_series_gen), 
axis=1).columns),
+]
+else:
+values = (pd.concat(x, axis=1) for x in value_series_gen)
+
+result_iter = f(key, values, state)
+
+def verify_element(result):
+if not isinstance(result, pd.DataFrame):
+raise TypeError(
+"The type of element in return iterator of the 
user-defined function "
+"should be pandas.DataFrame, but is 
{}".format(type(result))
+)
+# the number of columns of result have to match the return type
+# but it is fine for result to have no columns at all if it is 
empty
+if not (

Review Comment:
   if not ... ?



##
python/pyspark/worker.py:
##
@@ -486,6 +580,35 @@ def mapper(a):
 vals = [a[o] for o in parsed_offsets[0][1]]

[GitHub] [spark] roczei commented on pull request #37679: [SPARK-35242][SQL] Support changing session catalog's default database

2022-09-19 Thread GitBox


roczei commented on PR #37679:
URL: https://github.com/apache/spark/pull/37679#issuecomment-1251364841

   Thanks @cloud-fan, I have implemented this and all tests passed. As I see we 
have resolved all of your feedbacks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #37922: [WIP][SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

2022-09-19 Thread GitBox


mridulm commented on PR #37922:
URL: https://github.com/apache/spark/pull/37922#issuecomment-1251349810

   > The push-based shuffle service will auto clean up the old shuffle merge 
data
   
   Consider the case I mentioned above - stage retry for an `INDETERMINATE` 
stage.
   We cleanup previous attempts only if it happens to use the same merger in 
new attempt as well.
   
   The previous attempt's mergers are not reused for the next attempt - and so 
the previous mergers will continue to hold stale data without cleaning them up 
- until application terminates.
   Note - any merger which happens to be reused in new stage attempt will 
cleanup - I am referring to those which are not used: old attempt `mergerLocs` 
`--` new attempts `mergerLocs`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ayudovin commented on a diff in pull request #37923: [SPARK-40334][PS] Implement `GroupBy.prod`

2022-09-19 Thread GitBox


ayudovin commented on code in PR #37923:
URL: https://github.com/apache/spark/pull/37923#discussion_r974514017


##
python/pyspark/pandas/groupby.py:
##
@@ -993,6 +993,98 @@ def nth(self, n: int) -> FrameLike:
 
 return self._prepare_return(DataFrame(internal))
 
+def prod(self, numeric_only: Optional[bool] = True, min_count: int = 0):
+"""
+Compute prod of groups.
+
+Parameters
+--
+numeric_only : bool, default False
+Include only float, int, boolean columns. If None, will attempt to use
+everything, then use only numeric data.
+
+min_count: int, default 0
+The required number of valid values to perform the operation.
+If fewer than min_count non-NA values are present the result will be 
NA.
+
+.. versionadded:: 3.4.0
+
+Returns
+---
+pyspark.pandas.Series or pyspark.pandas.DataFrame
+
+See Also
+
+pyspark.pandas.Series.groupby
+pyspark.pandas.DataFrame.groupby
+
+Examples
+
+>>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2],
+...'B': [np.nan, 2, 3, 4, 5],
+...'C': [1, 2, 1, 1, 2],
+...'D': [True, False, True, False, True]})
+
+Groupby one column and return the prod of the remaining columns in
+each group.
+
+>>> df.groupby('A').prod().sort_index()
+ B  C  D
+A
+1  8.0  2  0
+2  15.0 2  11
+
+>>> df.groupby('A').prod(min_count=3).sort_index()
+ B  C   D
+A
+1  NaN  2  0
+2  NaN NaN  NaN
+"""
+
+self._validate_agg_columns(numeric_only=numeric_only, 
function_name="prod")
+
+groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in 
range(len(self._groupkeys))]
+internal, agg_columns, sdf = self._prepare_reduce(
+groupkey_names=groupkey_names,
+accepted_spark_types=(NumericType, BooleanType),
+bool_to_numeric=True,
+)
+
+psdf: DataFrame = DataFrame(internal)
+if len(psdf._internal.column_labels) > 0:
+
+stat_exprs = []
+for label in psdf._internal.column_labels:
+psser = psdf._psser_for(label)
+column = psser._dtype_op.nan_to_null(psser).spark.column
+data_type = psser.spark.data_type
+
+if isinstance(data_type, IntegralType):
+
stat_exprs.append(F.product(column).cast(data_type).alias(f"{label[0]}"))
+else:
+stat_exprs.append(F.product(column).alias(f"{label[0]}"))
+
+stat_exprs.append(F.count(column).alias(f"{label[0]}_count"))

Review Comment:
   yes, you're right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pralabhkumar commented on pull request #37417: [SPARK-33782][K8S][CORE]Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster m

2022-09-19 Thread GitBox


pralabhkumar commented on PR #37417:
URL: https://github.com/apache/spark/pull/37417#issuecomment-1251334364

   @dongjoon-hyun , Have incorporated all the review comments , please look 
into the same. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #37930: [SPARK-40487][SQL] Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel

2022-09-19 Thread GitBox


AmplabJenkins commented on PR #37930:
URL: https://github.com/apache/spark/pull/37930#issuecomment-1251324416

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xiaonanyang-db opened a new pull request, #37933: SPARK-40474 Infer columns with mixed date and timestamp as String in CSV schema inference

2022-09-19 Thread GitBox


xiaonanyang-db opened a new pull request, #37933:
URL: https://github.com/apache/spark/pull/37933

   
   
   ### What changes were proposed in this pull request?
   
   Adjust part of changes in https://github.com/apache/spark/pull/36871. 
   In the pr above, we introduced the support of date type in CSV schema 
inference. The schema inference behavior on date time columns now is:
   - For a column only containing dates, we will infer it as Date type
   - For a column only containing timestamps, we will infer it as Timestamp type
   - For a column containing a mixture of dates and timestamps, we will infer 
it as Timestamp type
   
   However, we found that we are too ambitious on the last scenario, to support 
which we have introduced much complexity in code and caused a lot of 
performance concerns. Thus, we want to simplify the behavior of the last 
scenario as:
   
   - For a column containing a mixture of dates and timestamps, we will infer 
it as String type
   
   ### Why are the changes needed?
   Simplify CSV dateTime inference logic.
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No compared to the previous PR.
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on a diff in pull request #37634: [SPARK-40199][SQL] Provide useful error when projecting a non-null column encounters null value

2022-09-19 Thread GitBox


xkrogen commented on code in PR #37634:
URL: https://github.com/apache/spark/pull/37634#discussion_r974499166


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala:
##
@@ -252,28 +267,44 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
  """.stripMargin
   }
 
+  /**
+   * Wrap `inputExpr` in a try-catch block that will catch any 
[[NullPointerException]] that is
+   * thrown, instead throwing a (more helpful) error message as provided by
+   * 
[[org.apache.spark.sql.errors.QueryExecutionErrors.valueCannotBeNullError]].
+   */
+  private def wrapWithNpeHandling(inputExpr: String, descPath: Seq[String]): 
String =
+s"""
+   |try {
+   |  ${inputExpr.trim}

Review Comment:
   I prefer exception-catching as it handles this issue with zero overhead. 
Adding a null-check here essentially falls back to the logic for a nullable 
schema:
   
https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L119-L133
   From the benchmark results, we can see that there is nontrivial overhead for 
the null-check; for the simple case of a projection of a primitive, the 
overhead is almost 50%:
   
https://github.com/apache/spark/blob/2a1f9767213c321bd52e7714fa3b5bfc4973ba40/sql/catalyst/benchmarks/UnsafeProjectionBenchmark-jdk17-results.txt#L9-L10
   
   You call out the situation of a null silently being replaced with a default 
value; this is a good point. I'm not sure how we can handle that without 
additional overhead of an explicit check. It seems that the default value 
replacement logic is coming from [Scala's own unboxing 
logic](https://github.com/scala/scala/blob/986dcc160aab85298f6cab0bf8dd0345497cdc01/src/library/scala/runtime/BoxesRunTime.java#L102).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xkrogen commented on pull request #37634: [SPARK-40199][SQL] Provide useful error when projecting a non-null column encounters null value

2022-09-19 Thread GitBox


xkrogen commented on PR #37634:
URL: https://github.com/apache/spark/pull/37634#issuecomment-1251319065

   Thanks for the suggestion @cloud-fan ! Good point about there many places 
where Spark trusts nullability. Here I am trying to target places where _user 
code_ could introduce a null. This includes data sources (including in-memory 
dataset creation like `sparkContext.parallelize()` or `DatasetHolder.toDS()`) 
and UDFs (including lambdas in calls to DF/DS APIs). User code is inherently 
untrustworthy, so more checks are warranted. I think any of these places where 
user code supplies values would be covered by this PR since they all go through 
projection before being accessed by other codepaths, but LMK if you disagree. I 
guess one situation could be if the optimizer completely removes some operator 
because of the nullability, so the data is never even accessed--are you 
thinking about situations like this?
   
   Unfortunately we cannot use `AssertNotNull` for this; the optimizer will 
remove it if the input schema indicates that the schema is non-null:
   
https://github.com/apache/spark/blob/96831bbb6749910c8f9497c048ba05e6e317649f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala#L826
   And that optimizer rule is there for a good reason; there is nontrivial 
overhead associated with the null check as discussed further in [my other 
comment](https://github.com/apache/spark/pull/37634#discussion_r974499166).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yaohua628 commented on pull request #37905: [SPARK-40460][SS] Fix streaming metrics when selecting `_metadata`

2022-09-19 Thread GitBox


Yaohua628 commented on PR #37905:
URL: https://github.com/apache/spark/pull/37905#issuecomment-1251311583

   > There's conflict in branch-3.3. @Yaohua628 Could you please craft a PR for 
branch-3.3? Thanks in advance!
   
   Done! https://github.com/apache/spark/pull/37932 - Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yaohua628 commented on pull request #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata

2022-09-19 Thread GitBox


Yaohua628 commented on PR #37932:
URL: https://github.com/apache/spark/pull/37932#issuecomment-1251310801

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yaohua628 opened a new pull request, #37932: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata

2022-09-19 Thread GitBox


Yaohua628 opened a new pull request, #37932:
URL: https://github.com/apache/spark/pull/37932

   
   
   ### What changes were proposed in this pull request?
   
   Cherry-picked from #37905 
   
   Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when 
selecting `_metadata` column. Because the logical plan from the batch and the 
actual planned logical plan are mismatched. So, 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348)
 we cannot find the plan and collect metrics correctly.
   
   This PR fixes this by replacing the initial `LogicalPlan` with the 
`LogicalPlan` containing the metadata column
   
   
   ### Why are the changes needed?
   Bug fix.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Existing + New UTs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


dongjoon-hyun commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974491025


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -2159,6 +2171,16 @@ private[spark] class DAGScheduler(
 }
   }
 
+  private def isExecutorDecommissioned(bmAddress: BlockManagerId) = {

Review Comment:
   If you don't mind, please add `taskScheduler` as a parameter here and use it 
inside function body.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


dongjoon-hyun commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974490653


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1860,8 +1863,17 @@ private[spark] class DAGScheduler(
 s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
 } else {
   failedStage.failedAttemptIds.add(task.stageAttemptId)
+  val ignoreStageFailure = ignoreDecommissionFetchFailure &&
+isExecutorDecommissioned(bmAddress)
+  if (ignoreStageFailure) {
+logInfo("Ignoring fetch failure from $task of $failedStage 
attempt" +

Review Comment:
   This log message might be misleading or wrong when 
`disallowStageRetryForTest` is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

2022-09-19 Thread GitBox


dongjoon-hyun commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974486551


##
docs/configuration.md:
##
@@ -2605,6 +2605,15 @@ Apart from these, the following properties are also 
available, and may be useful
   
   2.2.0
 
+
+  spark.stage.attempt.ignoreOnDecommissionFetchFailure
+  false
+  
+Whether ignore stage fetch failure caused by executor decommission when 

Review Comment:
   Please remove the trailing space at the end of this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >