[GitHub] [spark] wankunde closed pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


wankunde closed pull request #38672: [SPARK-41159][SQL] Optimize like any and 
like all expressions
URL: https://github.com/apache/spark/pull/38672


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


wankunde commented on PR #38672:
URL: https://github.com/apache/spark/pull/38672#issuecomment-1340543632

   After `LikeSimplification`,  the combination of multiple like expressions 
with `OR` can be pushdown to parquet reader, while `like any` can not.
   So close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer opened a new pull request, #38962: [SPARK-40852][CONNECT][PYTHON] Add document for `DataFrame.summary`

2022-12-06 Thread GitBox


beliefer opened a new pull request, #38962:
URL: https://github.com/apache/spark/pull/38962

   ### What changes were proposed in this pull request?
   This PR adds document for `DataFrame.summary`.
   
   
   ### Why are the changes needed?
   This PR adds document for `DataFrame.summary`.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   `DataFrame.summary` is a new API
   
   
   ### How was this patch tested?
   N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041871281


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -310,6 +311,9 @@ class RocksDB(
 "checkpoint" -> checkpointTimeMs,
 "fileSync" -> fileSyncTimeMs
   )
+  // reset resources as we already pushed the changes and it has been 
committed

Review Comment:
   Actually I intended to put this here, as I thought we have to clean up only 
when it is successfully committed. But looks like it's also safe to place this 
to `finally`, as we invalidate the loaded version in any way if any exception 
happens. Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] grundprinzip commented on pull request #38879: [SPARK-41362][CONNECT][PYTHON] Better error messages for invalid argument types.

2022-12-06 Thread GitBox


grundprinzip commented on PR #38879:
URL: https://github.com/apache/spark/pull/38879#issuecomment-1340539415

   @HyukjinKwon @zhengruifeng @amaliujia more opinions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions

2022-12-06 Thread GitBox


zhengruifeng commented on PR #38914:
URL: https://github.com/apache/spark/pull/38914#issuecomment-1340537857

   merged into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #38942: [SPARK-41437][SQL] Do not optimize the input query twice for v1 write fallback

2022-12-06 Thread GitBox


cloud-fan commented on PR #38942:
URL: https://github.com/apache/spark/pull/38942#issuecomment-1340537896

   cc @viirya @gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #38942: [SPARK-41437][SQL] Do not optimize the input query twice for v1 write fallback

2022-12-06 Thread GitBox


cloud-fan commented on code in PR #38942:
URL: https://github.com/apache/spark/pull/38942#discussion_r1041868772


##
sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala:
##
@@ -132,17 +132,21 @@ class V1WriteFallbackSuite extends QueryTest with 
SharedSparkSession with Before
 assert(e3.getMessage.contains("schema"))
   }
 
-  test("fallback writes should only analyze plan once") {
+  test("fallback writes should only analyze/optimize plan once") {

Review Comment:
   ```suggestion
 test("SPARK-41437: fallback writes should only analyze/optimize plan 
once") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions

2022-12-06 Thread GitBox


zhengruifeng closed pull request #38914: [SPARK-41381][CONNECT][PYTHON] 
Implement `count_distinct` and `sum_distinct` functions
URL: https://github.com/apache/spark/pull/38914


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit

2022-12-06 Thread GitBox


jerrypeng commented on code in PR #38880:
URL: https://github.com/apache/spark/pull/38880#discussion_r1041868304


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -310,6 +311,9 @@ class RocksDB(
 "checkpoint" -> checkpointTimeMs,
 "fileSync" -> fileSyncTimeMs
   )
+  // reset resources as we already pushed the changes and it has been 
committed

Review Comment:
   Should we do this in the finally clause to prevent any exception/error 
causing this not to run?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041866576


##
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+"message" : [
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in pre-fetched offset to end offset for each microbatch. ",
+  "topic-partitions for pre-fetched offset: , 
topic-partitions for end offset: ."
+]
+  },
+  "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : {
+"message" : [
+  "For Kafka data source with Trigger.AvailableNow, end offset should have 
lower or equal offset per each topic partition than pre-fetched offset.",
+  "pre-fetched offset: , end offset: ."
+]
+  },
+  "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : {
+"message" : [
+  "Some of partitions in Kafka topic(s) have been lost during running 
query with Trigger.AvailableNow. Make sure topic partitions are not dropped 
during the query run.",

Review Comment:
   Yup, maybe it's not a trivial case that users drop (and recreate) a topic 
which is being read from the running query. Better guidance may be just saying 
restart your query and it will work well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


MaxGekk commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041866109


##
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {

Review Comment:
   Since this could appear in headers of paragraphs in docs, could you make it 
shorter if it is possible. Also cc @srielau 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38961:
URL: https://github.com/apache/spark/pull/38961#discussion_r1041847513


##
python/pyspark/sql/tests/connect/test_connect_function.py:
##
@@ -413,6 +431,144 @@ def test_aggregation_functions(self):
 sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 
0.9])).toPandas(),
 )
 
+def test_collection_functions(self):
+from pyspark.sql import functions as SF
+from pyspark.sql.connect import functions as CF
+
+query = """
+SELECT * FROM VALUES
+(ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), 1, 2, 'a'),
+(ARRAY('x', NULL), NULL, ARRAY(1, 3), 3, 4, 'x'),
+(NULL, ARRAY(-1, -2, -3), Array(), 5, 6, NULL)
+AS tab(a, b, c, d, e, f)
+"""
+# +-+++---+---++
+# |a|   b|   c|  d|  e|   f|
+# +-+++---+---++
+# |  [a, ab]|   [1, 2, 3]|[1, null, 3]|  1|  2|   a|
+# |[x, null]|null|  [1, 3]|  3|  4|   x|
+# | null|[-1, -2, -3]|  []|  5|  6|null|
+# +-+++---+---++
+
+cdf = self.connect.sql(query)
+sdf = self.spark.sql(query)
+
+for cfunc, sfunc in [
+(CF.array_distinct, SF.array_distinct),
+(CF.array_max, SF.array_max),
+(CF.array_min, SF.array_min),
+]:
+self.assert_eq(
+cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(),
+sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(),
+)
+
+for cfunc, sfunc in [
+(CF.array_except, SF.array_except),
+(CF.array_intersect, SF.array_intersect),
+(CF.array_union, SF.array_union),
+(CF.arrays_overlap, SF.arrays_overlap),
+]:
+self.assert_eq(
+cdf.select(cfunc("b", cdf.c)).toPandas(),
+sdf.select(sfunc("b", sdf.c)).toPandas(),
+)
+
+for cfunc, sfunc in [
+(CF.array_position, SF.array_position),
+(CF.array_remove, SF.array_remove),
+]:
+self.assert_eq(
+cdf.select(cfunc(cdf.a, "ab")).toPandas(),
+sdf.select(sfunc(sdf.a, "ab")).toPandas(),
+)
+
+# test array
+self.assert_eq(
+cdf.select(CF.array(cdf.d, "e")).toPandas(),
+sdf.select(SF.array(sdf.d, "e")).toPandas(),
+)
+self.assert_eq(
+cdf.select(CF.array(cdf.d, "e", CF.lit(99))).toPandas(),
+sdf.select(SF.array(sdf.d, "e", SF.lit(99))).toPandas(),
+)
+
+# test array_contains
+self.assert_eq(
+cdf.select(CF.array_contains(cdf.a, "ab")).toPandas(),
+sdf.select(SF.array_contains(sdf.a, "ab")).toPandas(),
+)
+self.assert_eq(
+cdf.select(CF.array_contains(cdf.a, cdf.f)).toPandas(),
+sdf.select(SF.array_contains(sdf.a, sdf.f)).toPandas(),
+)
+
+# test array_join
+self.assert_eq(
+cdf.select(
+CF.array_join(cdf.a, ","), CF.array_join("b", ":"), 
CF.array_join("c", "~")
+).toPandas(),
+sdf.select(
+SF.array_join(sdf.a, ","), SF.array_join("b", ":"), 
SF.array_join("c", "~")
+).toPandas(),
+)
+self.assert_eq(
+cdf.select(
+CF.array_join(cdf.a, ",", "_null_"),
+CF.array_join("b", ":", ".null."),
+CF.array_join("c", "~", "NULL"),
+).toPandas(),
+sdf.select(
+SF.array_join(sdf.a, ",", "_null_"),
+SF.array_join("b", ":", ".null."),
+SF.array_join("c", "~", "NULL"),
+).toPandas(),
+)
+
+# test array_repeat
+self.assert_eq(
+cdf.select(CF.array_repeat(cdf.f, "d")).toPandas(),
+sdf.select(SF.array_repeat(sdf.f, "d")).toPandas(),
+)
+self.assert_eq(
+cdf.select(CF.array_repeat("f", cdf.d)).toPandas(),
+sdf.select(SF.array_repeat("f", sdf.d)).toPandas(),
+)
+# TODO: Make Literal contains DataType
+#   Cannot resolve "array_repeat(f, 3)" due to data type mismatch:

Review Comment:
   I think we may need to revisit 
https://github.com/apache/spark/pull/38800#discussion_r1033281998 at some point,
   we'd better specify the `Datatype` of `3` to IntegerType instead of LongType.
   
   or we can use `Cast` as a workaround when `Cast` is ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please 

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+
+if (tpsForPrefetched != tpsForEndOffset) {
+  throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+tpsForPrefetched, tpsForEndOffset)
+}
+
+val endOffsetHasGreaterThanPrefetched = {
+  allDataForTriggerAvailableNow.keySet.exists { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset > offsetFromPrefetched
+  }
+}
+if (endOffsetHasGreaterThanPrefetched) {
+  throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+allDataForTriggerAvailableNow, endPartitionOffsets)
+}
+
+val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   It's not safe to just pretend that Kafka resource won't change during the 
query run. If it happens, it will end up with unexpected behavior e.g. with 
consumer based grouping, polling from non-exist topic partition may lead to 
timeout of metadata fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+
+if (tpsForPrefetched != tpsForEndOffset) {
+  throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+tpsForPrefetched, tpsForEndOffset)
+}
+
+val endOffsetHasGreaterThanPrefetched = {
+  allDataForTriggerAvailableNow.keySet.exists { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset > offsetFromPrefetched
+  }
+}
+if (endOffsetHasGreaterThanPrefetched) {
+  throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+allDataForTriggerAvailableNow, endPartitionOffsets)
+}
+
+val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   It's not safe to just pretend that Kafka resource won't change during the 
query run. If it happens, it will end up with unexpected behavior e.g. consumer 
based grouping, polling from non-exist topic partition may lead to timeout of 
metadata fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #38937: [SPARK-41406][SQL] Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic

2022-12-06 Thread GitBox


MaxGekk commented on code in PR #38937:
URL: https://github.com/apache/spark/pull/38937#discussion_r1041863502


##
sql/core/src/test/resources/sql-tests/results/except-all.sql.out:
##
@@ -230,10 +230,9 @@ org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "NUM_COLUMNS_MISMATCH",
   "messageParameters" : {
-"invalidNumColumns" : "2",
-"invalidOrdinalNum" : "second",
+"leftNumCols" : "1",
 "operator" : "EXCEPTALL",

Review Comment:
   hmm, could you output it as `EXCEPT ALL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on pull request #38898: [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset

2022-12-06 Thread GitBox


jerrypeng commented on PR #38898:
URL: https://github.com/apache/spark/pull/38898#issuecomment-1340530153

   @wecharyu can you run one batch and then delete all the partitions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+
+if (tpsForPrefetched != tpsForEndOffset) {
+  throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+tpsForPrefetched, tpsForEndOffset)
+}
+
+val endOffsetHasGreaterThanPrefetched = {
+  allDataForTriggerAvailableNow.keySet.exists { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset > offsetFromPrefetched
+  }
+}
+if (endOffsetHasGreaterThanPrefetched) {
+  throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+allDataForTriggerAvailableNow, endPartitionOffsets)
+}
+
+val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka 
again, what we are trying to guard against? If someone turns on failOnDataLoss 
and runs the query with Trigger.AvailableNow, it will just move on when 
specific topic / partition is dropped, leading that it never reaches the end 
state (prepared offset).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+
+if (tpsForPrefetched != tpsForEndOffset) {
+  throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+tpsForPrefetched, tpsForEndOffset)
+}
+
+val endOffsetHasGreaterThanPrefetched = {
+  allDataForTriggerAvailableNow.keySet.exists { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset > offsetFromPrefetched
+  }
+}
+if (endOffsetHasGreaterThanPrefetched) {
+  throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+allDataForTriggerAvailableNow, endPartitionOffsets)
+}
+
+val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka 
again, what we are trying to guard against? If someone turns on failOnDataLoss 
and runs the query with Trigger.AvailableNow, it will just move on when 
specific topic partition is dropped, leading that it never reaches the end 
state (prepared offset). Same for topic recreation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+
+if (tpsForPrefetched != tpsForEndOffset) {
+  throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+tpsForPrefetched, tpsForEndOffset)
+}
+
+val endOffsetHasGreaterThanPrefetched = {
+  allDataForTriggerAvailableNow.keySet.exists { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset > offsetFromPrefetched
+  }
+}
+if (endOffsetHasGreaterThanPrefetched) {
+  throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+allDataForTriggerAvailableNow, endPartitionOffsets)
+}
+
+val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka 
again, what we are trying to guard against? If someone turns on failOnDataLoss 
and runs the query with Trigger.AvailableNow, it will just move on when 
specific topic partition is dropped, leading that it never reaches the end 
state (prepared offset).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041856979


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+
+if (tpsForPrefetched != tpsForEndOffset) {
+  throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+tpsForPrefetched, tpsForEndOffset)
+}
+
+val endOffsetHasGreaterThanPrefetched = {
+  allDataForTriggerAvailableNow.keySet.exists { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset > offsetFromPrefetched
+  }
+}
+if (endOffsetHasGreaterThanPrefetched) {
+  throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+allDataForTriggerAvailableNow, endPartitionOffsets)
+}
+
+val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   Why do we need to fetch the latest offset from Kafka again?  This will add 
additional latency.  I know its trigger available now but what does this buy us?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation

2022-12-06 Thread GitBox


LuciferYang commented on PR #38874:
URL: https://github.com/apache/spark/pull/38874#issuecomment-1340524315

   > Thanks for reviewing this. @LuciferYang let me know when you think it's 
ready to go.
   
   @HyukjinKwon @zhengruifeng The Scala part is good to me, please further 
review, thanks ~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041856552


##
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+"message" : [
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in pre-fetched offset to end offset for each microbatch. ",

Review Comment:
   We are guarding against arbitrary bugs so there is no 100% guarantee that 
this would only happen transiently, but I agree that restarting the query would 
mitigate the issue in most cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38961:
URL: https://github.com/apache/spark/pull/38961#discussion_r1041844991


##
python/pyspark/sql/tests/connect/test_connect_function.py:
##
@@ -63,6 +63,24 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
 """These test cases exercise the interface to the proto plan
 generation but do not call Spark."""
 
+def compare_by_show(self, df1: Any, df2: Any):

Review Comment:
   `toPandas` doesn't work correctly on some complex types like `array(struct)` 
`array(map)`, add this method to check by `DataFrame.show` for now...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041848809


##
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+"message" : [
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in pre-fetched offset to end offset for each microbatch. ",
+  "topic-partitions for pre-fetched offset: , 
topic-partitions for end offset: ."
+]
+  },
+  "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : {
+"message" : [
+  "For Kafka data source with Trigger.AvailableNow, end offset should have 
lower or equal offset per each topic partition than pre-fetched offset.",
+  "pre-fetched offset: , end offset: ."
+]
+  },
+  "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : {
+"message" : [
+  "Some of partitions in Kafka topic(s) have been lost during running 
query with Trigger.AvailableNow. Make sure topic partitions are not dropped 
during the query run.",

Review Comment:
   Same here.  Transient error right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041848355


##
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+"message" : [
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in pre-fetched offset to end offset for each microbatch. ",

Review Comment:
   This should be a transient issue right?  As in when the job is triggered 
again, the job should run fine right and number the job will read from 
whichever partitions exist?  Can you add that to this error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38961:
URL: https://github.com/apache/spark/pull/38961#discussion_r1041847513


##
python/pyspark/sql/tests/connect/test_connect_function.py:
##
@@ -413,6 +431,144 @@ def test_aggregation_functions(self):
 sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 
0.9])).toPandas(),
 )
 
+def test_collection_functions(self):
+from pyspark.sql import functions as SF
+from pyspark.sql.connect import functions as CF
+
+query = """
+SELECT * FROM VALUES
+(ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), 1, 2, 'a'),
+(ARRAY('x', NULL), NULL, ARRAY(1, 3), 3, 4, 'x'),
+(NULL, ARRAY(-1, -2, -3), Array(), 5, 6, NULL)
+AS tab(a, b, c, d, e, f)
+"""
+# +-+++---+---++
+# |a|   b|   c|  d|  e|   f|
+# +-+++---+---++
+# |  [a, ab]|   [1, 2, 3]|[1, null, 3]|  1|  2|   a|
+# |[x, null]|null|  [1, 3]|  3|  4|   x|
+# | null|[-1, -2, -3]|  []|  5|  6|null|
+# +-+++---+---++
+
+cdf = self.connect.sql(query)
+sdf = self.spark.sql(query)
+
+for cfunc, sfunc in [
+(CF.array_distinct, SF.array_distinct),
+(CF.array_max, SF.array_max),
+(CF.array_min, SF.array_min),
+]:
+self.assert_eq(
+cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(),
+sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(),
+)
+
+for cfunc, sfunc in [
+(CF.array_except, SF.array_except),
+(CF.array_intersect, SF.array_intersect),
+(CF.array_union, SF.array_union),
+(CF.arrays_overlap, SF.arrays_overlap),
+]:
+self.assert_eq(
+cdf.select(cfunc("b", cdf.c)).toPandas(),
+sdf.select(sfunc("b", sdf.c)).toPandas(),
+)
+
+for cfunc, sfunc in [
+(CF.array_position, SF.array_position),
+(CF.array_remove, SF.array_remove),
+]:
+self.assert_eq(
+cdf.select(cfunc(cdf.a, "ab")).toPandas(),
+sdf.select(sfunc(sdf.a, "ab")).toPandas(),
+)
+
+# test array
+self.assert_eq(
+cdf.select(CF.array(cdf.d, "e")).toPandas(),
+sdf.select(SF.array(sdf.d, "e")).toPandas(),
+)
+self.assert_eq(
+cdf.select(CF.array(cdf.d, "e", CF.lit(99))).toPandas(),
+sdf.select(SF.array(sdf.d, "e", SF.lit(99))).toPandas(),
+)
+
+# test array_contains
+self.assert_eq(
+cdf.select(CF.array_contains(cdf.a, "ab")).toPandas(),
+sdf.select(SF.array_contains(sdf.a, "ab")).toPandas(),
+)
+self.assert_eq(
+cdf.select(CF.array_contains(cdf.a, cdf.f)).toPandas(),
+sdf.select(SF.array_contains(sdf.a, sdf.f)).toPandas(),
+)
+
+# test array_join
+self.assert_eq(
+cdf.select(
+CF.array_join(cdf.a, ","), CF.array_join("b", ":"), 
CF.array_join("c", "~")
+).toPandas(),
+sdf.select(
+SF.array_join(sdf.a, ","), SF.array_join("b", ":"), 
SF.array_join("c", "~")
+).toPandas(),
+)
+self.assert_eq(
+cdf.select(
+CF.array_join(cdf.a, ",", "_null_"),
+CF.array_join("b", ":", ".null."),
+CF.array_join("c", "~", "NULL"),
+).toPandas(),
+sdf.select(
+SF.array_join(sdf.a, ",", "_null_"),
+SF.array_join("b", ":", ".null."),
+SF.array_join("c", "~", "NULL"),
+).toPandas(),
+)
+
+# test array_repeat
+self.assert_eq(
+cdf.select(CF.array_repeat(cdf.f, "d")).toPandas(),
+sdf.select(SF.array_repeat(sdf.f, "d")).toPandas(),
+)
+self.assert_eq(
+cdf.select(CF.array_repeat("f", cdf.d)).toPandas(),
+sdf.select(SF.array_repeat("f", sdf.d)).toPandas(),
+)
+# TODO: Make Literal contains DataType
+#   Cannot resolve "array_repeat(f, 3)" due to data type mismatch:

Review Comment:
   I think we may need to revisit 
https://github.com/apache/spark/pull/38800#discussion_r1033281998 at some point,
   we'd better specify the `Datatype` of `3` to IntegerType instead of LongType.
   
   or we can use `Cast` as a workaround.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure 

[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2

2022-12-06 Thread GitBox


huaxingao commented on code in PR #38904:
URL: https://github.com/apache/spark/pull/38904#discussion_r1041841165


##
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##
@@ -294,7 +313,30 @@ abstract class InMemoryBaseTable(
   val objectHeaderSizeInBytes = 12L
   val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize
   val sizeInBytes = numRows * rowSizeInBytes
-  InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows))
+
+  val map = new util.HashMap[NamedReference, ColumnStatistics]()
+  val colNames = readSchema.fields.map(_.name)
+  for (col <- colNames) {
+val fieldReference = FieldReference(col)
+// put some fake data for testing only
+val bin1 = InMemoryHistogramBin(1, 2, 5L)
+val bin2 = InMemoryHistogramBin(3, 4, 5L)
+val bin3 = InMemoryHistogramBin(5, 6, 5L)
+val bin4 = InMemoryHistogramBin(7, 8, 5L)
+val bin5 = InMemoryHistogramBin(9, 10, 5L)

Review Comment:
   I removed the fake data and computed NDV and null Count for testing purpose. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2

2022-12-06 Thread GitBox


huaxingao commented on PR #38904:
URL: https://github.com/apache/spark/pull/38904#issuecomment-1340511363

   > Also curious how this is to be used by Spark
   
   
   The newly added `ColumnStatistics` is converted to logical `ColumnStat` in 
this 
[method](https://github.com/apache/spark/blob/0cddab9a618dc185efc2424ea934af5aa565a213/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala#L213)
 and is used in CBO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38961:
URL: https://github.com/apache/spark/pull/38961#discussion_r1041844991


##
python/pyspark/sql/tests/connect/test_connect_function.py:
##
@@ -63,6 +63,24 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
 """These test cases exercise the interface to the proto plan
 generation but do not call Spark."""
 
+def compare_by_show(self, df1: Any, df2: Any):

Review Comment:
   `toPandas` doesn't work correctly on some complex types like `array(struct)` 
`array(map)`, add this method to check by `DataFrame.show`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38874:
URL: https://github.com/apache/spark/pull/38874#discussion_r1041845222


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,51 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(array) - Removes null values from the array.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3, null));
+   [1,2,3]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayCompact(child: Expression)
+  extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+  override def dataType: DataType = child.dataType
+
+  @transient private lazy val elementType: DataType = 
dataType.asInstanceOf[ArrayType].elementType
+  override def checkInputDataTypes(): TypeCheckResult = {
+super.checkInputDataTypes() match {
+  case f if f.isFailure => f
+  case TypeCheckResult.TypeCheckSuccess =>
+TypeUtils.checkForOrderingExpr(elementType, prettyName)
+}
+  }
+
+  override def nullSafeEval(array: Any): Any = {
+val newArray = new Array[Any](array.asInstanceOf[ArrayData].numElements())

Review Comment:
   I think `Array` is ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38961:
URL: https://github.com/apache/spark/pull/38961#discussion_r1041844991


##
python/pyspark/sql/tests/connect/test_connect_function.py:
##
@@ -63,6 +63,24 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
 """These test cases exercise the interface to the proto plan
 generation but do not call Spark."""
 
+def compare_by_show(self, df1: Any, df2: Any):

Review Comment:
   toPandas doesn't work correctly on some complex types like `array(struct)` 
`array(map)`, add this method to check by `DataFrame.show`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2

2022-12-06 Thread GitBox


huaxingao commented on code in PR #38904:
URL: https://github.com/apache/spark/pull/38904#discussion_r1041841092


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java:
##
@@ -31,4 +35,7 @@
 public interface Statistics {
   OptionalLong sizeInBytes();
   OptionalLong numRows();
+  default Optional> columnStats() {

Review Comment:
   Changed. Thanks



##
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##
@@ -294,7 +313,30 @@ abstract class InMemoryBaseTable(
   val objectHeaderSizeInBytes = 12L
   val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize
   val sizeInBytes = numRows * rowSizeInBytes
-  InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows))
+
+  val map = new util.HashMap[NamedReference, ColumnStatistics]()
+  val colNames = readSchema.fields.map(_.name)
+  for (col <- colNames) {
+val fieldReference = FieldReference(col)
+// put some fake data for testing only
+val bin1 = InMemoryHistogramBin(1, 2, 5L)
+val bin2 = InMemoryHistogramBin(3, 4, 5L)
+val bin3 = InMemoryHistogramBin(5, 6, 5L)
+val bin4 = InMemoryHistogramBin(7, 8, 5L)
+val bin5 = InMemoryHistogramBin(9, 10, 5L)

Review Comment:
   I computed NDV and null Count for testing purpose. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2

2022-12-06 Thread GitBox


huaxingao commented on code in PR #38904:
URL: https://github.com/apache/spark/pull/38904#discussion_r1041840929


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.colstats;

Review Comment:
   The classes inside this package are for Column Stats. There is one existing 
class `Statistics`,  I can't group it in the new package.  It's probably better 
to call `colstats`. 



##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/Histogram.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.colstats;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * An interface to represent an equi-height histogram, which is a part of
+ * {@link ColumnStatistics}. Equi-height histogram represents the distribution 
of
+ * a column's values by a sequence of bins.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface Histogram {
+  double height();

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2

2022-12-06 Thread GitBox


huaxingao commented on code in PR #38904:
URL: https://github.com/apache/spark/pull/38904#discussion_r1041840770


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.colstats;
+
+import org.apache.spark.annotation.Evolving;
+import java.math.BigInteger;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * An interface to represent column statistics, which is part of
+ * {@link Statistics}.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public interface ColumnStatistics {
+  default Optional distinctCount() {
+return Optional.empty();
+  }
+
+  default Optional min() {
+return Optional.empty();
+  }
+
+  default Optional max() {
+return Optional.empty();
+  }
+
+  default Optional nullCount() {
+return Optional.empty();
+  }
+
+  default OptionalLong avgLen() {

Review Comment:
   Comments added. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng opened a new pull request, #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C

2022-12-06 Thread GitBox


zhengruifeng opened a new pull request, #38961:
URL: https://github.com/apache/spark/pull/38961

   ### What changes were proposed in this pull request?
Implement `collection` functions alphabetically, this PR contains `A` ~ `C` 
 except:
   
   - aggregate, array_sort - need the support of LambdaFunction Expression
   
   
   ### Why are the changes needed?
   
   For API coverage
   
   
   ### Does this PR introduce _any_ user-facing change?
   new APIs
   
   ### How was this patch tested?
   added UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2

2022-12-06 Thread GitBox


huaxingao commented on code in PR #38904:
URL: https://github.com/apache/spark/pull/38904#discussion_r1041840529


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.colstats;
+
+import org.apache.spark.annotation.Evolving;
+import java.math.BigInteger;
+import java.util.Optional;
+import java.util.OptionalLong;

Review Comment:
   Fixed. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #38960: [SPARK-41435][SQL] Make `curdate()` throw `WRONG_NUM_ARGS ` when args is not null

2022-12-06 Thread GitBox


LuciferYang opened a new pull request, #38960:
URL: https://github.com/apache/spark/pull/38960

   ### What changes were proposed in this pull request?
   `curdate()`  throw 
`QueryCompilationErrors.invalidFunctionArgumentNumberError` with `Seq.empty` 
input when `expressions` is not empty, then the error class will be 
`_LEGACY_ERROR_TEMP_1043 `:
   
   ```json
   "_LEGACY_ERROR_TEMP_1043" : {
   "message" : [
 "Invalid arguments for function ."
   ]
 }
   ```
   
   this pr change the first input arg from `Seq.empty` to `Seq(0)` to make the 
error class use `WRONG_NUM_ARGS`:
   
   ```json
   "WRONG_NUM_ARGS" : {
   "message" : [
 "The  requires  parameters but the actual 
number is ."
   ]
 }
   ```
   
   ### Why are the changes needed?
   `WRONG_NUM_ARGS` is a more appropriate error class
   
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Add new test case
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38958: [SPARK-41433][CONNECT] Make Max Arrow BatchSize configurable

2022-12-06 Thread GitBox


zhengruifeng commented on PR #38958:
URL: https://github.com/apache/spark/pull/38958#issuecomment-1340494395

   cc @grundprinzip @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions

2022-12-06 Thread GitBox


zhengruifeng commented on PR #38914:
URL: https://github.com/apache/spark/pull/38914#issuecomment-1340493981

   also cc @cloud-fan @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38865:
URL: https://github.com/apache/spark/pull/38865#discussion_r1041827973


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+/**
+ * Given an array, and another element append the element at the end of the 
array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array, element) - Append the element",
+  examples =
+"""
+Examples:
+  > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd');
+   ["b","d","c","a","d"]
+
+  """,
+  since = "3.4.0",
+  group = "array_funcs")
+case class ArrayAppend(left: Expression, right: Expression)
+  extends BinaryExpression
+  with ImplicitCastInputTypes
+  with ComplexTypeMergingExpression
+  with QueryErrorsBase {
+  override def prettyName: String = "array_append"
+
+  override def inputTypes: Seq[AbstractDataType] = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, hasNull), e2) =>
+TypeCoercion.findTightestCommonType(e1, e2) match {
+  case Some(dt) => Seq(ArrayType(dt, hasNull), dt)
+  case _ => Seq.empty
+}
+  case _ => Seq.empty
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+val value2 = right.eval(input)
+if (value1 == null) {
+  null
+} else {
+  nullSafeEval(value1, value2)
+}
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) {
+TypeCheckResult.TypeCheckSuccess
+  }
+  else {
+DataTypeMismatch(
+  errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES",
+  messageParameters = Map(
+"functionName" -> toSQLId(prettyName),
+"leftType" -> toSQLType(left.dataType),
+"rightType" -> toSQLType(right.dataType),
+"dataType" -> toSQLType(ArrayType)
+  ))
+  }
+  case _ =>
+DataTypeMismatch(
+  errorSubClass = "UNEXPECTED_INPUT_TYPE",
+  messageParameters = Map(
+"paramIndex" -> "0",
+"requiredType" -> toSQLType(ArrayType),
+"inputSql" -> toSQLExpr(left),
+"inputType" -> toSQLType(left.dataType)
+  )
+)
+}
+  }
+
+  protected def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): ArrayAppend =
+copy(left = newLeft, right = newRight)
+
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+val arrayData = input1.asInstanceOf[ArrayData]
+val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+val elementData = input2
+val numberOfElements = arrayData.numElements() + 1
+if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements)
+}
+val finalData = new Array[Any](numberOfElements)
+arrayData.foreach(arrayElementType, finalData.update)
+finalData.update(numberOfElements - 1, elementData)
+new GenericArrayData(finalData)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+
+val f = (left: String, right: String) => {
+  val expr = ctx.addReferenceObj("arraysAppendExpr", this)
+  s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);"
+}
+
+val leftGen = left.genCode(ctx)
+val rightGen = right.genCode(ctx)

Review Comment:
   WDYT @zhengruifeng ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38865:
URL: https://github.com/apache/spark/pull/38865#discussion_r1041827496


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+/**
+ * Given an array, and another element append the element at the end of the 
array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array, element) - Append the element",
+  examples =
+"""
+Examples:
+  > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd');
+   ["b","d","c","a","d"]
+
+  """,
+  since = "3.4.0",
+  group = "array_funcs")
+case class ArrayAppend(left: Expression, right: Expression)
+  extends BinaryExpression
+  with ImplicitCastInputTypes
+  with ComplexTypeMergingExpression
+  with QueryErrorsBase {
+  override def prettyName: String = "array_append"
+
+  override def inputTypes: Seq[AbstractDataType] = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, hasNull), e2) =>
+TypeCoercion.findTightestCommonType(e1, e2) match {
+  case Some(dt) => Seq(ArrayType(dt, hasNull), dt)
+  case _ => Seq.empty
+}
+  case _ => Seq.empty
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+val value2 = right.eval(input)
+if (value1 == null) {
+  null
+} else {
+  nullSafeEval(value1, value2)
+}
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) {
+TypeCheckResult.TypeCheckSuccess
+  }
+  else {
+DataTypeMismatch(
+  errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES",
+  messageParameters = Map(
+"functionName" -> toSQLId(prettyName),
+"leftType" -> toSQLType(left.dataType),
+"rightType" -> toSQLType(right.dataType),
+"dataType" -> toSQLType(ArrayType)
+  ))
+  }
+  case _ =>
+DataTypeMismatch(
+  errorSubClass = "UNEXPECTED_INPUT_TYPE",
+  messageParameters = Map(
+"paramIndex" -> "0",
+"requiredType" -> toSQLType(ArrayType),
+"inputSql" -> toSQLExpr(left),
+"inputType" -> toSQLType(left.dataType)
+  )
+)
+}
+  }
+
+  protected def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): ArrayAppend =
+copy(left = newLeft, right = newRight)
+
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+val arrayData = input1.asInstanceOf[ArrayData]
+val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+val elementData = input2
+val numberOfElements = arrayData.numElements() + 1
+if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements)
+}
+val finalData = new Array[Any](numberOfElements)
+arrayData.foreach(arrayElementType, finalData.update)
+finalData.update(numberOfElements - 1, elementData)
+new GenericArrayData(finalData)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+
+val f = (left: String, right: String) => {
+  val expr = ctx.addReferenceObj("arraysAppendExpr", this)
+  s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);"
+}
+
+val leftGen = left.genCode(ctx)
+val rightGen = right.genCode(ctx)

Review Comment:
   Personally,  I think we should `override this behaviour` to append `null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] infoankitp commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function

2022-12-06 Thread GitBox


infoankitp commented on code in PR #38865:
URL: https://github.com/apache/spark/pull/38865#discussion_r1041824037


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+/**
+ * Given an array, and another element append the element at the end of the 
array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array, element) - Append the element",
+  examples =
+"""
+Examples:
+  > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd');
+   ["b","d","c","a","d"]
+
+  """,
+  since = "3.4.0",
+  group = "array_funcs")
+case class ArrayAppend(left: Expression, right: Expression)
+  extends BinaryExpression
+  with ImplicitCastInputTypes
+  with ComplexTypeMergingExpression
+  with QueryErrorsBase {
+  override def prettyName: String = "array_append"
+
+  override def inputTypes: Seq[AbstractDataType] = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, hasNull), e2) =>
+TypeCoercion.findTightestCommonType(e1, e2) match {
+  case Some(dt) => Seq(ArrayType(dt, hasNull), dt)
+  case _ => Seq.empty
+}
+  case _ => Seq.empty
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+val value2 = right.eval(input)
+if (value1 == null) {
+  null
+} else {
+  nullSafeEval(value1, value2)
+}
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) {
+TypeCheckResult.TypeCheckSuccess
+  }
+  else {
+DataTypeMismatch(
+  errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES",
+  messageParameters = Map(
+"functionName" -> toSQLId(prettyName),
+"leftType" -> toSQLType(left.dataType),
+"rightType" -> toSQLType(right.dataType),
+"dataType" -> toSQLType(ArrayType)
+  ))
+  }
+  case _ =>
+DataTypeMismatch(
+  errorSubClass = "UNEXPECTED_INPUT_TYPE",
+  messageParameters = Map(
+"paramIndex" -> "0",
+"requiredType" -> toSQLType(ArrayType),
+"inputSql" -> toSQLExpr(left),
+"inputType" -> toSQLType(left.dataType)
+  )
+)
+}
+  }
+
+  protected def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): ArrayAppend =
+copy(left = newLeft, right = newRight)
+
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+val arrayData = input1.asInstanceOf[ArrayData]
+val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+val elementData = input2
+val numberOfElements = arrayData.numElements() + 1
+if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements)
+}
+val finalData = new Array[Any](numberOfElements)
+arrayData.foreach(arrayElementType, finalData.update)
+finalData.update(numberOfElements - 1, elementData)
+new GenericArrayData(finalData)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+
+val f = (left: String, right: String) => {
+  val expr = ctx.addReferenceObj("arraysAppendExpr", this)
+  s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);"
+}
+
+val leftGen = left.genCode(ctx)
+val rightGen = right.genCode(ctx)

Review Comment:
   Nope the elements inside the src Array are not being translated to -1. 
   
   ```
   
   val df10 = spark.createDataFrame(
 spark.sparkContext.parallelize(
   Seq(Row(Seq[Integer](1, 2, 3, null), null))),
 StructType(List(
   StructField("a", ArrayType.apply(IntegerType), true),
   StructField("b", IntegerType, true)
 ))
   )
   
   df10.selectExpr("array_append(a, b)").printSchema()
   checkAnswer(df10.selectExpr("array_append(a, b)"),
 Seq(Row(Seq(1, 2, 3, null, null)))
   )
   ```
   Output
   ```
   == Results ==
   !== Correct Answer - 1 ==  == Spark Answer - 1 ==
   !struct<>  struct>
   ![List(1, 2, 3, null, null)]   [ArrayBuffer(1, 2, 3, null, -1)]
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: 

[GitHub] [spark] beliefer commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


beliefer commented on code in PR #38672:
URL: https://github.com/apache/spark/pull/38672#discussion_r1041818122


##
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/LikeAnyBenchmark.scala:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import scala.util.Random
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Benchmark to measure like any expressions performance.
+ *
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class 
+ * --jars , 
+ *   2. build/sbt "sql/Test/runMain "
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/Test/runMain "
+ *  Results will be written to "benchmarks/LikeAnyBenchmark-results.txt".
+ * }}}
+ */
+object LikeAnyBenchmark extends SqlBasedBenchmark {

Review Comment:
   It means regression ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] navinvishy commented on a diff in pull request #38947: [SPARK-41231][SQL] Adds an array_prepend function to catalyst

2022-12-06 Thread GitBox


navinvishy commented on code in PR #38947:
URL: https://github.com/apache/spark/pull/38947#discussion_r1041817673


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -119,21 +117,24 @@ case class Size(child: Expression, legacySizeOfNull: 
Boolean)
 val value = child.eval(input)
 if (value == null) {
   if (legacySizeOfNull) -1 else null
-} else child.dataType match {
-  case _: ArrayType => value.asInstanceOf[ArrayData].numElements()
-  case _: MapType => value.asInstanceOf[MapData].numElements()
-  case other => throw 
QueryExecutionErrors.unsupportedOperandTypeForSizeFunctionError(other)
-}
+} else

Review Comment:
   Yeah there were too many of those. I've removed them now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


wankunde commented on code in PR #38672:
URL: https://github.com/apache/spark/pull/38672#discussion_r1041811602


##
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/LikeAnyBenchmark.scala:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import scala.util.Random
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Benchmark to measure like any expressions performance.
+ *
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class 
+ * --jars , 
+ *   2. build/sbt "sql/Test/runMain "
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/Test/runMain "
+ *  Results will be written to "benchmarks/LikeAnyBenchmark-results.txt".
+ * }}}
+ */
+object LikeAnyBenchmark extends SqlBasedBenchmark {

Review Comment:
   Before this PR:
   ```
   [info] Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
   [info] Multi like query: Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   [info] 

   [info] Query with multi like  1393   
1469 119  0.0 1392586.7   1.0X
   [info] Query with LikeAny simplification  1244   
1309  97  0.0 1244382.5   1.1X
   [info] Query without LikeAny simplification400
407   8  0.0  399924.3   3.5X
   
   [info] Multi like query: Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   [info] 

   [info] Query with multi like  1476   
1576 149  0.0 1475710.1   1.0X
   [info] Query with LikeAny simplification  1387   
1429  37  0.0 1386669.1   1.1X
   [info] Query without LikeAny simplification430
470  35  0.0  430435.8   3.4X
   ```
   
   After this PR:
   ```
   [info] Multi like query: Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   [info] 

   [info] Query with multi like  1441   
1516  78  0.0 1441335.8   1.0X
   [info] Query with LikeAny simplification  1401   
1431  44  0.0 1400743.9   1.0X
   [info] Query without LikeAny simplification357
369  10  0.0  357419.8   4.0X
   
   [info] Multi like query: Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   [info] 

   [info] Query with multi like  1524   
1628 117  0.0 1524119.6   1.0X
   [info] Query with LikeAny simplification  1405   
1418  18  0.0 1405258.7   1.1X
   [info] Query without LikeAny simplification362
372  12  0.0  361654.4   4.2X
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional 

[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-06 Thread GitBox


wineternity commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1340463701

   > The change looks good to me. +CC @Ngone51
   > 
   > Btw, do you also want to remove the `if (event.taskInfo == null) {` check 
in beginning of `onTaskEnd` ?
   > 
   > Make it a precondition check ? `Preconditions.checkNotNull(event.taskInfo)`
   
   Yes, it can be change to a precondition check. Maybe I can change it in a 
new pr after test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38940: [WIP][SPARK-41409][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_1043` to `INVALID_FUNCTION_ARGS`

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38940:
URL: https://github.com/apache/spark/pull/38940#discussion_r1041802953


##
sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala:
##
@@ -638,10 +638,16 @@ class UDFSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-28521 error message for CAST(parameter types contains 
DataType)") {
-val e = intercept[AnalysisException] {
-  spark.sql("SELECT CAST(1)")
-}
-assert(e.getMessage.contains("Invalid arguments for function cast"))
+checkError(
+  exception = intercept[AnalysisException] {
+sql("SELECT CAST(1)")

Review Comment:
   Do you have any suggestions on the calculation way of `validParametersCount` 
@MaxGekk ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38940: [WIP][SPARK-41409][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_1043` to `INVALID_FUNCTION_ARGS`

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38940:
URL: https://github.com/apache/spark/pull/38940#discussion_r1041802953


##
sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala:
##
@@ -638,10 +638,16 @@ class UDFSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-28521 error message for CAST(parameter types contains 
DataType)") {
-val e = intercept[AnalysisException] {
-  spark.sql("SELECT CAST(1)")
-}
-assert(e.getMessage.contains("Invalid arguments for function cast"))
+checkError(
+  exception = intercept[AnalysisException] {
+sql("SELECT CAST(1)")

Review Comment:
   Do you have any suggestions on the calculation way of `validParametersCount 
`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match

2022-12-06 Thread GitBox


sunchao commented on code in PR #38924:
URL: https://github.com/apache/spark/pull/38924#discussion_r1041789809


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -114,8 +117,21 @@ case class BatchScanExec(
   // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
   sparkContext.parallelize(Array.empty[InternalRow], 1)
 } else {
+  var finalPartitions = filteredPartitions
+
+  outputPartitioning match {
+case p: KeyGroupedPartitioning =>
+  val partitionMapping = finalPartitions.map(s =>
+s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
+  finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Review Comment:
   I was hoping to re-use the logic code in #38950 too, which is not related to 
runtime filtering. If moving this close to the `if (oldRows.size < 
newRows.size)` check then I may have to duplicate it somewhere else?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -114,8 +117,21 @@ case class BatchScanExec(
   // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
   sparkContext.parallelize(Array.empty[InternalRow], 1)
 } else {
+  var finalPartitions = filteredPartitions
+
+  outputPartitioning match {
+case p: KeyGroupedPartitioning =>
+  val partitionMapping = finalPartitions.map(s =>
+s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
+  finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Review Comment:
   I was hoping to re-use the logic in #38950 too, which is not related to 
runtime filtering. If moving this close to the `if (oldRows.size < 
newRows.size)` check then I may have to duplicate it somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] akpatnam25 commented on pull request #38959: [WIP] SPARK-41415: SASL Request Retries

2022-12-06 Thread GitBox


akpatnam25 commented on PR #38959:
URL: https://github.com/apache/spark/pull/38959#issuecomment-1340410484

   cc @mridulm  @otterc @zhouyejoe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] akpatnam25 opened a new pull request, #38959: [WIP] SPARK-41415: SASL Request Retries

2022-12-06 Thread GitBox


akpatnam25 opened a new pull request, #38959:
URL: https://github.com/apache/spark/pull/38959

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on pull request #38937: [SPARK-41406][SQL] Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic

2022-12-06 Thread GitBox


panbingkun commented on PR #38937:
URL: https://github.com/apache/spark/pull/38937#issuecomment-1340399634

   cc @MaxGekk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

2022-12-06 Thread GitBox


HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041779007


##
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##
@@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream(
 }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+  endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+val tpsForEndOffset = endPartitionOffsets.keySet
+assert(tpsForPrefetched == tpsForEndOffset,
+  "Kafka data source in Trigger.AvailableNow should provide the same topic 
partitions in " +
+"pre-fetched offset to end offset for each microbatch. " +
+s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+s"topic-partitions for end offset: $tpsForEndOffset.")
+
+val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = {
+  allDataForTriggerAvailableNow.keySet.forall { tp =>
+val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+val offsetFromEndOffset = endPartitionOffsets(tp)
+offsetFromEndOffset <= offsetFromPrefetched
+  }
+}
+assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched,

Review Comment:
   I just dealt with error class framework (separate one for Kafka data source) 
as well as making errors to non-internal (user-facing).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38940: [WIP][SPARK-41409][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_1043` to `INVALID_FUNCTION_ARGS`

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38940:
URL: https://github.com/apache/spark/pull/38940#discussion_r1041778431


##
sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala:
##
@@ -638,10 +638,16 @@ class UDFSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-28521 error message for CAST(parameter types contains 
DataType)") {
-val e = intercept[AnalysisException] {
-  spark.sql("SELECT CAST(1)")
-}
-assert(e.getMessage.contains("Invalid arguments for function cast"))
+checkError(
+  exception = intercept[AnalysisException] {
+sql("SELECT CAST(1)")

Review Comment:
   
https://github.com/apache/spark/blob/37453ad5c85122d11b436813b4f5eddf615a6cf8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L137-L144
   
   In this scenario, the `validParametersCount` is also empty, so 
`WRONG_NUM_ARGS ` cannot be reused now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match

2022-12-06 Thread GitBox


cloud-fan commented on code in PR #38924:
URL: https://github.com/apache/spark/pull/38924#discussion_r1041775153


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -114,8 +117,21 @@ case class BatchScanExec(
   // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
   sparkContext.parallelize(Array.empty[InternalRow], 1)
 } else {
+  var finalPartitions = filteredPartitions
+
+  outputPartitioning match {
+case p: KeyGroupedPartitioning =>
+  val partitionMapping = finalPartitions.map(s =>
+s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
+  finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Review Comment:
   It's better to centralize the related code.
   
   This empty filling is quite related to the `if (oldRows.size < 
newRows.size)` check. We should put them together if possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation

2022-12-06 Thread GitBox


HyukjinKwon commented on code in PR #38874:
URL: https://github.com/apache/spark/pull/38874#discussion_r1041775256


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,51 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(array) - Removes null values from the array.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3, null));
+   [1,2,3]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayCompact(child: Expression)

Review Comment:
   +1 for @zhengruifeng for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match

2022-12-06 Thread GitBox


cloud-fan commented on code in PR #38924:
URL: https://github.com/apache/spark/pull/38924#discussion_r1041775153


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -114,8 +117,21 @@ case class BatchScanExec(
   // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
   sparkContext.parallelize(Array.empty[InternalRow], 1)
 } else {
+  var finalPartitions = filteredPartitions
+
+  outputPartitioning match {
+case p: KeyGroupedPartitioning =>
+  val partitionMapping = finalPartitions.map(s =>
+s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
+  finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Review Comment:
   It's better to centralize the related code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name

2022-12-06 Thread GitBox


HyukjinKwon closed pull request #38957: 
[SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name
URL: https://github.com/apache/spark/pull/38957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name

2022-12-06 Thread GitBox


HyukjinKwon commented on PR #38957:
URL: https://github.com/apache/spark/pull/38957#issuecomment-1340392569

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sandeep-katta commented on a diff in pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation

2022-12-06 Thread GitBox


sandeep-katta commented on code in PR #38874:
URL: https://github.com/apache/spark/pull/38874#discussion_r1041772280


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,51 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(array) - Removes null values from the array.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3, null));
+   [1,2,3]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayCompact(child: Expression)
+  extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+  override def dataType: DataType = child.dataType
+
+  @transient private lazy val elementType: DataType = 
dataType.asInstanceOf[ArrayType].elementType
+  override def checkInputDataTypes(): TypeCheckResult = {
+super.checkInputDataTypes() match {
+  case f if f.isFailure => f
+  case TypeCheckResult.TypeCheckSuccess =>
+TypeUtils.checkForOrderingExpr(elementType, prettyName)
+}
+  }
+
+  override def nullSafeEval(array: Any): Any = {
+val newArray = new Array[Any](array.asInstanceOf[ArrayData].numElements())

Review Comment:
   I am slicing the array at the end like `new 
GenericArrayData(newArray.slice(0, pos))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #38937: [SPARK-41406][SQL] Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic

2022-12-06 Thread GitBox


AmplabJenkins commented on PR #38937:
URL: https://github.com/apache/spark/pull/38937#issuecomment-1340389026

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


beliefer commented on code in PR #38672:
URL: https://github.com/apache/spark/pull/38672#discussion_r1041767560


##
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/LikeAnyBenchmark.scala:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import scala.util.Random
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Benchmark to measure like any expressions performance.
+ *
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class 
+ * --jars , 
+ *   2. build/sbt "sql/Test/runMain "
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/Test/runMain "
+ *  Results will be written to "benchmarks/LikeAnyBenchmark-results.txt".
+ * }}}
+ */
+object LikeAnyBenchmark extends SqlBasedBenchmark {

Review Comment:
   The benchmark cannot prove the performance improvement. Could you test with 
or without `MatchMultiHelper` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


beliefer commented on code in PR #38672:
URL: https://github.com/apache/spark/pull/38672#discussion_r1041765598


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala:
##
@@ -248,13 +248,90 @@ case class ILike(
   }
 }
 
+case class MatchMultiHelper(
+isNotSpecified: Boolean,
+partialMatch: Boolean,
+patterns: Seq[UTF8String]) {

Review Comment:
   `LikeSimplification` have the similar optimization. Why need this class ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng opened a new pull request, #38958: [SPARK-41433][CONNECT] Make Max Arrow BatchSize configurable

2022-12-06 Thread GitBox


zhengruifeng opened a new pull request, #38958:
URL: https://github.com/apache/spark/pull/38958

   ### What changes were proposed in this pull request?
   Make Max Arrow BatchSize configurable
   
   
   ### Why are the changes needed?
   make batchsize configurable
   
   
   ### Does this PR introduce _any_ user-facing change?
   yes, one new configration
   
   
   ### How was this patch tested?
   existing tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia closed pull request #38908: [SPARK-41384][CONNECT] Should use SQLExpression for str arguments in Projection

2022-12-06 Thread GitBox


amaliujia closed pull request #38908: [SPARK-41384][CONNECT] Should use 
SQLExpression for str arguments in Projection
URL: https://github.com/apache/spark/pull/38908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #38899: [SPARK-41349][CONNECT] Implement DataFrame.hint

2022-12-06 Thread GitBox


amaliujia commented on code in PR #38899:
URL: https://github.com/apache/spark/pull/38899#discussion_r1041759325


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, 
CreateStruct}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+object LiteralValueProtoConverter {
+
+  /**
+   * Transforms the protocol buffers literals into the appropriate Catalyst 
literal expression.
+   *
+   * @return
+   *   Expression

Review Comment:
   hmmm we actually enforce scalafmt for Spark Connect code. So if this pass 
scalafmt that is fine.
   
   Only Spark Connect enforce that so if you are seeing this is different from 
other modules, that is expected.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #38899: [SPARK-41349][CONNECT] Implement DataFrame.hint

2022-12-06 Thread GitBox


amaliujia commented on code in PR #38899:
URL: https://github.com/apache/spark/pull/38899#discussion_r1041759325


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, 
CreateStruct}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+object LiteralValueProtoConverter {
+
+  /**
+   * Transforms the protocol buffers literals into the appropriate Catalyst 
literal expression.
+   *
+   * @return
+   *   Expression

Review Comment:
   hmmm we actually enable scalafmt for Spark Connect code. So if this pass 
scalafmt that is fine.
   
   Only Spark Connect enforce that so if you are seeing this is different from 
other modules, that is expected.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #38899: [SPARK-41349][CONNECT] Implement DataFrame.hint

2022-12-06 Thread GitBox


beliefer commented on code in PR #38899:
URL: https://github.com/apache/spark/pull/38899#discussion_r1041753682


##
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, 
CreateStruct}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+object LiteralValueProtoConverter {
+
+  /**
+   * Transforms the protocol buffers literals into the appropriate Catalyst 
literal expression.
+   *
+   * @return
+   *   Expression

Review Comment:
   Yeah. But it seems strange. cc @amaliujia 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side read metrics

2022-12-06 Thread GitBox


mridulm commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1041749200


##
core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json:
##
@@ -81,7 +93,19 @@
   "remoteBytesRead" : 0,
   "remoteBytesReadToDisk" : 0,
   "localBytesRead" : 0,
-  "recordsRead" : 0
+  "recordsRead" : 0,
+  "pushBased": {

Review Comment:
   Yes, that would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-06 Thread GitBox


mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1041745719


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Since we are making an API change, given SPARK-37831, we should add 
`partitionId` as well.
   We can use either of them in `ExecutorAllocationManager`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-06 Thread GitBox


mridulm commented on code in PR #38711:
URL: https://github.com/apache/spark/pull/38711#discussion_r1041745719


##
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala:
##
@@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) 
extends SparkListe
 @DeveloperApi
 case class SparkListenerSpeculativeTaskSubmitted(

Review Comment:
   Since we are making an API change, given SPARK-37831, we should add 
`partitionId` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match

2022-12-06 Thread GitBox


sunchao commented on code in PR #38924:
URL: https://github.com/apache/spark/pull/38924#discussion_r1041745736


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -114,8 +117,21 @@ case class BatchScanExec(
   // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
   sparkContext.parallelize(Array.empty[InternalRow], 1)
 } else {
+  var finalPartitions = filteredPartitions
+
+  outputPartitioning match {
+case p: KeyGroupedPartitioning =>
+  val partitionMapping = finalPartitions.map(s =>
+s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
+  finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Review Comment:
   Hmm any obvious advantage of this? It looks the same to me  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name

2022-12-06 Thread GitBox


amaliujia commented on PR #38957:
URL: https://github.com/apache/spark/pull/38957#issuecomment-1340358549

   @HyukjinKwon @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia opened a new pull request, #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name

2022-12-06 Thread GitBox


amaliujia opened a new pull request, #38957:
URL: https://github.com/apache/spark/pull/38957

   
   
   ### What changes were proposed in this pull request?
   
   The current maven package is not showing connect server as the name:
   
   https://user-images.githubusercontent.com/1938382/206088497-1e3af988-d6cb-473d-9e99-23f4827349ef.png;>
   
   This PR changes the name to match that it is a server module now.
   
   ### Why are the changes needed?
   
   Showing correct name when run maven package.
   
   ### Does this PR introduce _any_ user-facing change?
   
   NO
   
   ### How was this patch tested?
   
   N/A


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38933: [SPARK-41404][SQL][TESTS] Refactor `ColumnVectorUtils#toBatch` to make `ColumnarBatchSuite#testRandomRows` test more dataType

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38933:
URL: https://github.com/apache/spark/pull/38933#discussion_r1041743714


##
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java:
##
@@ -165,7 +171,17 @@ private static void appendValue(WritableColumnVector dst, 
DataType t, Object o)
 dst.getChild(1).appendInt(c.days);
 dst.getChild(2).appendLong(c.microseconds);
   } else if (t instanceof DateType) {
-dst.appendInt(DateTimeUtils.fromJavaDate((Date)o));
+if (o instanceof Date) {

Review Comment:
   OK, change to only test `Date` and `Timestamp` due they are default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38921: [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions

2022-12-06 Thread GitBox


zhengruifeng commented on PR #38921:
URL: https://github.com/apache/spark/pull/38921#issuecomment-1340345754

   merged into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #38921: [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions

2022-12-06 Thread GitBox


zhengruifeng closed pull request #38921: [SPARK-41397][CONNECT][PYTHON] 
Implement part of string/binary functions
URL: https://github.com/apache/spark/pull/38921


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions

2022-12-06 Thread GitBox


wankunde commented on PR #38672:
URL: https://github.com/apache/spark/pull/38672#issuecomment-1340342118

   Hi, @beliefer @cloud-fan @wangyum Could you help to review this PR? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tedyu commented on pull request #38948: [SPARK-41419][K8S] Decrement PVC_COUNTER when the pod deletion happens

2022-12-06 Thread GitBox


tedyu commented on PR #38948:
URL: https://github.com/apache/spark/pull/38948#issuecomment-1340342014

   In ExecutorPodsAllocatorSuite.scala, the pair of configs always have the 
following values:
   ```
 .set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
 .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
   ```
   If one of them is false, `requestNewExecutors` would proceed with creating 
pod and PVC.
   
   @dongjoon-hyun 
   Do you know why there is no test with either of these configs as false ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #38938: [WIP][SPARK-41403][CONNECT][PYTHON] Implement `DataFrame.describe`

2022-12-06 Thread GitBox


beliefer commented on code in PR #38938:
URL: https://github.com/apache/spark/pull/38938#discussion_r1041729116


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -1239,6 +1239,16 @@ def summary(self, *statistics: str) -> "DataFrame":
 session=self._session,
 )
 
+def describe(self, *cols: str) -> "DataFrame":
+_cols: List[str] = list(cols)

Review Comment:
   I will update it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38914:
URL: https://github.com/apache/spark/pull/38914#discussion_r1041728393


##
python/pyspark/sql/tests/connect/test_connect_function.py:
##
@@ -413,6 +412,22 @@ def test_aggregation_functions(self):
 sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 
0.9])).toPandas(),
 )
 
+# test count_distinct
+self.assert_eq(
+cdf.select(CF.count_distinct("b"), 
CF.count_distinct(cdf.c)).toPandas(),
+sdf.select(SF.count_distinct("b"), 
SF.count_distinct(sdf.c)).toPandas(),
+)
+# The output column names of 'groupBy.agg(count_distinct)' in PySpark

Review Comment:
   this incorrect column name seems not easy to fix, since it should not be 
related to this PR, I change the column names and compare the values here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement

2022-12-06 Thread GitBox


dongjoon-hyun closed pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove 
PVC_COUNTER decrement
URL: https://github.com/apache/spark/pull/38949


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement

2022-12-06 Thread GitBox


dongjoon-hyun commented on PR #38949:
URL: https://github.com/apache/spark/pull/38949#issuecomment-1340334817

   Thank you so much, @viirya . Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement

2022-12-06 Thread GitBox


dongjoon-hyun commented on code in PR #38949:
URL: https://github.com/apache/spark/pull/38949#discussion_r1041724958


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##
@@ -455,7 +455,6 @@ class ExecutorPodsAllocator(
 .inNamespace(namespace)
 .resource(createdExecutorPod)
 .delete()
-  PVC_COUNTER.decrementAndGet()

Review Comment:
   Since this is used for driver-owned PVCs, the deletion of pods is irrelevant 
to the number of PVC. That was my logical bug at the initial patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement

2022-12-06 Thread GitBox


dongjoon-hyun commented on code in PR #38949:
URL: https://github.com/apache/spark/pull/38949#discussion_r1041724958


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##
@@ -455,7 +455,6 @@ class ExecutorPodsAllocator(
 .inNamespace(namespace)
 .resource(createdExecutorPod)
 .delete()
-  PVC_COUNTER.decrementAndGet()

Review Comment:
   Since this is driver-owned PVCs, the deletion of pods is irrelevant to the 
number of PVC. That was my logical bug at the initial patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38954: [SPARK-41417][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_0019` to `CANNOT_PARSE_VALUE_TO_DATATYPE`

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38954:
URL: https://github.com/apache/spark/pull/38954#discussion_r1041724482


##
sql/core/src/test/resources/sql-tests/results/literals.sql.out:
##
@@ -353,10 +353,12 @@ pattern%  no-pattern\%pattern\%   pattern\\%
 select '\'', '"', '\n', '\r', '\t', 'Z'
 -- !query schema
 struct<':string,":string,
-:string,:string,   :string,Z:string>
+:string,
+:string,   :string,Z:string>
 -- !query output
 '  "   
-   Z

Review Comment:
   fiixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement

2022-12-06 Thread GitBox


dongjoon-hyun commented on code in PR #38949:
URL: https://github.com/apache/spark/pull/38949#discussion_r1041724358


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##
@@ -455,7 +455,6 @@ class ExecutorPodsAllocator(
 .inNamespace(namespace)
 .resource(createdExecutorPod)
 .delete()
-  PVC_COUNTER.decrementAndGet()

Review Comment:
   Yes, right for case 1 and 2. So, this is a complete and minimal patch, 
@viirya .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #38882: [SPARK-41365][UI] Stages UI page fails to load for proxy in specific yarn environment

2022-12-06 Thread GitBox


gengliangwang commented on PR #38882:
URL: https://github.com/apache/spark/pull/38882#issuecomment-1340331429

   @yabola I am not quite sure about the "yarn proxy" you mentioned.
   Can we fix the issue in a narrow waist method? IIUC there are also ajax 
requests in the executor page.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38915: [SPARK-41382][CONNECT][PYTHON] Implement `product` function

2022-12-06 Thread GitBox


zhengruifeng commented on PR #38915:
URL: https://github.com/apache/spark/pull/38915#issuecomment-1340330948

   merged into master, thank you for reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #38915: [SPARK-41382][CONNECT][PYTHON] Implement `product` function

2022-12-06 Thread GitBox


zhengruifeng closed pull request #38915: [SPARK-41382][CONNECT][PYTHON] 
Implement `product` function
URL: https://github.com/apache/spark/pull/38915


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38956:
URL: https://github.com/apache/spark/pull/38956#discussion_r1041722230


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala:
##
@@ -161,6 +161,15 @@ case class CaseWhen(
 elseValue: Option[Expression] = None)
   extends ComplexTypeMergingExpression with ConditionalExpression {
 
+  def this(branches: Seq[Expression]) = this(

Review Comment:
   still fail with:
   ```
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
   status = StatusCode.UNKNOWN
   details = "[FAILED_FUNCTION_CALL] Failed preparing of the function 
`when` for call. Please, double check function's arguments."
   debug_error_string = 
"{"created":"@1670384303.828721000","description":"Error received from peer 
ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"[FAILED_FUNCTION_CALL]
 Failed preparing of the function `when` for call. Please, double check 
function's arguments.","grpc_status":2}"
   >
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match

2022-12-06 Thread GitBox


cloud-fan commented on code in PR #38924:
URL: https://github.com/apache/spark/pull/38924#discussion_r1041721325


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -114,8 +117,21 @@ case class BatchScanExec(
   // return an empty RDD with 1 partition if dynamic filtering removed the 
only split
   sparkContext.parallelize(Array.empty[InternalRow], 1)
 } else {
+  var finalPartitions = filteredPartitions
+
+  outputPartitioning match {
+case p: KeyGroupedPartitioning =>
+  val partitionMapping = finalPartitions.map(s =>
+s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
+  finalPartitions = p.partitionValuesOpt.get.map { partKey =>

Review Comment:
   shall we move this logic to `lazy val filteredPartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38956:
URL: https://github.com/apache/spark/pull/38956#discussion_r1041717834


##
python/pyspark/sql/connect/column.py:
##
@@ -129,6 +140,53 @@ def name(self) -> str:
 ...
 
 
+class CaseWhen(Expression):
+def __init__(
+self, branches: Sequence[Tuple[Expression, Expression]], else_value: 
Optional[Expression]
+):
+
+assert isinstance(branches, list)
+for branch in branches:
+assert (
+isinstance(branch, tuple)
+and len(branch) == 2
+and all(isinstance(expr, Expression) for expr in branch)
+)
+self._branches = branches
+
+if else_value is not None:
+assert isinstance(else_value, Expression)
+
+self._else_value = else_value
+
+def to_plan(self, session: "SparkConnectClient") -> "proto.Expression":
+# fun = proto.Expression()
+# fun.unresolved_function.function_name = "when"
+# for condition, value in self._branches:
+# 
fun.unresolved_function.arguments.extend(condition.to_plan(session))
+# fun.unresolved_function.arguments.extend(value.to_plan(session))
+# if self._else_value is not None:
+# 
fun.unresolved_function.arguments.extend(self._else_value.to_plan(session))
+# return fun
+
+args: Sequence[Expression] = []
+for condition, value in self._branches:
+args.append(condition)
+args.append(value)
+
+if self._else_value is not None:
+args.append(self._else_value)
+
+unresolved_function = UnresolvedFunction(name="when", args=args)
+
+return unresolved_function.to_plan(session)

Review Comment:
   let me add another constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38865:
URL: https://github.com/apache/spark/pull/38865#discussion_r1041716971


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+/**
+ * Given an array, and another element append the element at the end of the 
array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array, element) - Append the element",
+  examples =
+"""
+Examples:
+  > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd');
+   ["b","d","c","a","d"]
+
+  """,
+  since = "3.4.0",
+  group = "array_funcs")
+case class ArrayAppend(left: Expression, right: Expression)
+  extends BinaryExpression
+  with ImplicitCastInputTypes
+  with ComplexTypeMergingExpression
+  with QueryErrorsBase {
+  override def prettyName: String = "array_append"
+
+  override def inputTypes: Seq[AbstractDataType] = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, hasNull), e2) =>
+TypeCoercion.findTightestCommonType(e1, e2) match {
+  case Some(dt) => Seq(ArrayType(dt, hasNull), dt)
+  case _ => Seq.empty
+}
+  case _ => Seq.empty
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+val value2 = right.eval(input)
+if (value1 == null) {
+  null
+} else {
+  nullSafeEval(value1, value2)
+}
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) {
+TypeCheckResult.TypeCheckSuccess
+  }
+  else {
+DataTypeMismatch(
+  errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES",
+  messageParameters = Map(
+"functionName" -> toSQLId(prettyName),
+"leftType" -> toSQLType(left.dataType),
+"rightType" -> toSQLType(right.dataType),
+"dataType" -> toSQLType(ArrayType)
+  ))
+  }
+  case _ =>
+DataTypeMismatch(
+  errorSubClass = "UNEXPECTED_INPUT_TYPE",
+  messageParameters = Map(
+"paramIndex" -> "0",
+"requiredType" -> toSQLType(ArrayType),
+"inputSql" -> toSQLExpr(left),
+"inputType" -> toSQLType(left.dataType)
+  )
+)
+}
+  }
+
+  protected def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): ArrayAppend =
+copy(left = newLeft, right = newRight)
+
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+val arrayData = input1.asInstanceOf[ArrayData]
+val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+val elementData = input2
+val numberOfElements = arrayData.numElements() + 1
+if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements)
+}
+val finalData = new Array[Any](numberOfElements)
+arrayData.foreach(arrayElementType, finalData.update)
+finalData.update(numberOfElements - 1, elementData)
+new GenericArrayData(finalData)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+
+val f = (left: String, right: String) => {
+  val expr = ctx.addReferenceObj("arraysAppendExpr", this)
+  s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);"
+}
+
+val leftGen = left.genCode(ctx)
+val rightGen = right.genCode(ctx)

Review Comment:
   > @LuciferYang One more thing I noticed, if we are having a null int value, 
it is getting translated to -1. It is picking up the default value for Integer 
primitive type, in the JavaCode. I think we should override this behaviour 
right ?
   
   
   
   Will `null` in the src Array also be translated to `-1`?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function

2022-12-06 Thread GitBox


LuciferYang commented on code in PR #38865:
URL: https://github.com/apache/spark/pull/38865#discussion_r1041716971


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: 
Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
 newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = 
newLeft, right = newRight)
 }
+
+/**
+ * Given an array, and another element append the element at the end of the 
array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array, element) - Append the element",
+  examples =
+"""
+Examples:
+  > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd');
+   ["b","d","c","a","d"]
+
+  """,
+  since = "3.4.0",
+  group = "array_funcs")
+case class ArrayAppend(left: Expression, right: Expression)
+  extends BinaryExpression
+  with ImplicitCastInputTypes
+  with ComplexTypeMergingExpression
+  with QueryErrorsBase {
+  override def prettyName: String = "array_append"
+
+  override def inputTypes: Seq[AbstractDataType] = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, hasNull), e2) =>
+TypeCoercion.findTightestCommonType(e1, e2) match {
+  case Some(dt) => Seq(ArrayType(dt, hasNull), dt)
+  case _ => Seq.empty
+}
+  case _ => Seq.empty
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+val value2 = right.eval(input)
+if (value1 == null) {
+  null
+} else {
+  nullSafeEval(value1, value2)
+}
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) {
+TypeCheckResult.TypeCheckSuccess
+  }
+  else {
+DataTypeMismatch(
+  errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES",
+  messageParameters = Map(
+"functionName" -> toSQLId(prettyName),
+"leftType" -> toSQLType(left.dataType),
+"rightType" -> toSQLType(right.dataType),
+"dataType" -> toSQLType(ArrayType)
+  ))
+  }
+  case _ =>
+DataTypeMismatch(
+  errorSubClass = "UNEXPECTED_INPUT_TYPE",
+  messageParameters = Map(
+"paramIndex" -> "0",
+"requiredType" -> toSQLType(ArrayType),
+"inputSql" -> toSQLExpr(left),
+"inputType" -> toSQLType(left.dataType)
+  )
+)
+}
+  }
+
+  protected def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): ArrayAppend =
+copy(left = newLeft, right = newRight)
+
+  override protected def nullSafeEval(input1: Any, input2: Any): Any = {
+val arrayData = input1.asInstanceOf[ArrayData]
+val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+val elementData = input2
+val numberOfElements = arrayData.numElements() + 1
+if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements)
+}
+val finalData = new Array[Any](numberOfElements)
+arrayData.foreach(arrayElementType, finalData.update)
+finalData.update(numberOfElements - 1, elementData)
+new GenericArrayData(finalData)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+
+val f = (left: String, right: String) => {
+  val expr = ctx.addReferenceObj("arraysAppendExpr", this)
+  s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);"
+}
+
+val leftGen = left.genCode(ctx)
+val rightGen = right.genCode(ctx)

Review Comment:
   > Integer
   
   Will `null` in the src Array also be translated to `-1`?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38956:
URL: https://github.com/apache/spark/pull/38956#discussion_r1041716422


##
python/pyspark/sql/connect/column.py:
##
@@ -129,6 +140,53 @@ def name(self) -> str:
 ...
 
 
+class CaseWhen(Expression):
+def __init__(
+self, branches: Sequence[Tuple[Expression, Expression]], else_value: 
Optional[Expression]
+):
+
+assert isinstance(branches, list)
+for branch in branches:
+assert (
+isinstance(branch, tuple)
+and len(branch) == 2
+and all(isinstance(expr, Expression) for expr in branch)
+)
+self._branches = branches
+
+if else_value is not None:
+assert isinstance(else_value, Expression)
+
+self._else_value = else_value
+
+def to_plan(self, session: "SparkConnectClient") -> "proto.Expression":
+# fun = proto.Expression()
+# fun.unresolved_function.function_name = "when"
+# for condition, value in self._branches:
+# 
fun.unresolved_function.arguments.extend(condition.to_plan(session))
+# fun.unresolved_function.arguments.extend(value.to_plan(session))
+# if self._else_value is not None:
+# 
fun.unresolved_function.arguments.extend(self._else_value.to_plan(session))
+# return fun
+
+args: Sequence[Expression] = []
+for condition, value in self._branches:
+args.append(condition)
+args.append(value)
+
+if self._else_value is not None:
+args.append(self._else_value)
+
+unresolved_function = UnresolvedFunction(name="when", args=args)
+
+return unresolved_function.to_plan(session)

Review Comment:
   @hvanhovell  @amaliujia 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match

2022-12-06 Thread GitBox


sunchao commented on code in PR #38924:
URL: https://github.com/apache/spark/pull/38924#discussion_r1041716400


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##
@@ -81,18 +81,21 @@ case class BatchScanExec(
 
   val newRows = new InternalRowSet(p.expressions.map(_.dataType))
   newRows ++= 
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
-  val oldRows = p.partitionValuesOpt.get
 
-  if (oldRows.size != newRows.size) {
-throw new SparkException("Data source must have preserved the 
original partitioning " +
-"during runtime filtering: the number of unique partition 
values obtained " +
-s"through HasPartitionKey changed: before ${oldRows.size}, 
after ${newRows.size}")
+  val oldRows = p.partitionValuesOpt.get.toSet

Review Comment:
   Oops you're right! I forgot about `InternalRowSet`. Let me create a 
follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction

2022-12-06 Thread GitBox


zhengruifeng commented on code in PR #38956:
URL: https://github.com/apache/spark/pull/38956#discussion_r1041716023


##
python/pyspark/sql/connect/column.py:
##
@@ -129,6 +140,53 @@ def name(self) -> str:
 ...
 
 
+class CaseWhen(Expression):
+def __init__(
+self, branches: Sequence[Tuple[Expression, Expression]], else_value: 
Optional[Expression]
+):
+
+assert isinstance(branches, list)
+for branch in branches:
+assert (
+isinstance(branch, tuple)
+and len(branch) == 2
+and all(isinstance(expr, Expression) for expr in branch)
+)
+self._branches = branches
+
+if else_value is not None:
+assert isinstance(else_value, Expression)
+
+self._else_value = else_value
+
+def to_plan(self, session: "SparkConnectClient") -> "proto.Expression":
+# fun = proto.Expression()
+# fun.unresolved_function.function_name = "when"
+# for condition, value in self._branches:
+# 
fun.unresolved_function.arguments.extend(condition.to_plan(session))
+# fun.unresolved_function.arguments.extend(value.to_plan(session))
+# if self._else_value is not None:
+# 
fun.unresolved_function.arguments.extend(self._else_value.to_plan(session))
+# return fun
+
+args: Sequence[Expression] = []
+for condition, value in self._branches:
+args.append(condition)
+args.append(value)
+
+if self._else_value is not None:
+args.append(self._else_value)
+
+unresolved_function = UnresolvedFunction(name="when", args=args)
+
+return unresolved_function.to_plan(session)

Review Comment:
   both `cdf.select(CF.when(cdf.a == 0, 1.0))` and `cdf.select(CF.when(cdf.a == 
0, 1.0).otherwise(2.0))` fail with this error:
   
   ```
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
   status = StatusCode.UNKNOWN
   details = "Invalid arguments for function when."
   debug_error_string = 
"{"created":"@1670381875.743425000","description":"Error received from peer 
ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Invalid
 arguments for function when.","grpc_status":2}"
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng opened a new pull request, #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction

2022-12-06 Thread GitBox


zhengruifeng opened a new pull request, #38956:
URL: https://github.com/apache/spark/pull/38956

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >