[GitHub] [spark] wankunde closed pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
wankunde closed pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions URL: https://github.com/apache/spark/pull/38672 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde commented on pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
wankunde commented on PR #38672: URL: https://github.com/apache/spark/pull/38672#issuecomment-1340543632 After `LikeSimplification`, the combination of multiple like expressions with `OR` can be pushdown to parquet reader, while `like any` can not. So close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer opened a new pull request, #38962: [SPARK-40852][CONNECT][PYTHON] Add document for `DataFrame.summary`
beliefer opened a new pull request, #38962: URL: https://github.com/apache/spark/pull/38962 ### What changes were proposed in this pull request? This PR adds document for `DataFrame.summary`. ### Why are the changes needed? This PR adds document for `DataFrame.summary`. ### Does this PR introduce _any_ user-facing change? 'No'. `DataFrame.summary` is a new API ### How was this patch tested? N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit
HeartSaVioR commented on code in PR #38880: URL: https://github.com/apache/spark/pull/38880#discussion_r1041871281 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -310,6 +311,9 @@ class RocksDB( "checkpoint" -> checkpointTimeMs, "fileSync" -> fileSyncTimeMs ) + // reset resources as we already pushed the changes and it has been committed Review Comment: Actually I intended to put this here, as I thought we have to clean up only when it is successfully committed. But looks like it's also safe to place this to `finally`, as we invalidate the loaded version in any way if any exception happens. Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on pull request #38879: [SPARK-41362][CONNECT][PYTHON] Better error messages for invalid argument types.
grundprinzip commented on PR #38879: URL: https://github.com/apache/spark/pull/38879#issuecomment-1340539415 @HyukjinKwon @zhengruifeng @amaliujia more opinions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions
zhengruifeng commented on PR #38914: URL: https://github.com/apache/spark/pull/38914#issuecomment-1340537857 merged into master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #38942: [SPARK-41437][SQL] Do not optimize the input query twice for v1 write fallback
cloud-fan commented on PR #38942: URL: https://github.com/apache/spark/pull/38942#issuecomment-1340537896 cc @viirya @gengliangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38942: [SPARK-41437][SQL] Do not optimize the input query twice for v1 write fallback
cloud-fan commented on code in PR #38942: URL: https://github.com/apache/spark/pull/38942#discussion_r1041868772 ## sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala: ## @@ -132,17 +132,21 @@ class V1WriteFallbackSuite extends QueryTest with SharedSparkSession with Before assert(e3.getMessage.contains("schema")) } - test("fallback writes should only analyze plan once") { + test("fallback writes should only analyze/optimize plan once") { Review Comment: ```suggestion test("SPARK-41437: fallback writes should only analyze/optimize plan once") { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions
zhengruifeng closed pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions URL: https://github.com/apache/spark/pull/38914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38880: [SPARK-38277][SS] Clear write batch after RocksDB state store's commit
jerrypeng commented on code in PR #38880: URL: https://github.com/apache/spark/pull/38880#discussion_r1041868304 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -310,6 +311,9 @@ class RocksDB( "checkpoint" -> checkpointTimeMs, "fileSync" -> fileSyncTimeMs ) + // reset resources as we already pushed the changes and it has been committed Review Comment: Should we do this in the finally clause to prevent any exception/error causing this not to run? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041866576 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { +"message" : [ + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ", + "topic-partitions for pre-fetched offset: , topic-partitions for end offset: ." +] + }, + "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : { +"message" : [ + "For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset.", + "pre-fetched offset: , end offset: ." +] + }, + "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : { +"message" : [ + "Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. Make sure topic partitions are not dropped during the query run.", Review Comment: Yup, maybe it's not a trivial case that users drop (and recreate) a topic which is being read from the running query. Better guidance may be just saying restart your query and it will work well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
MaxGekk commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041866109 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { Review Comment: Since this could appear in headers of paragraphs in docs, could you make it shorter if it is possible. Also cc @srielau -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C
zhengruifeng commented on code in PR #38961: URL: https://github.com/apache/spark/pull/38961#discussion_r1041847513 ## python/pyspark/sql/tests/connect/test_connect_function.py: ## @@ -413,6 +431,144 @@ def test_aggregation_functions(self): sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 0.9])).toPandas(), ) +def test_collection_functions(self): +from pyspark.sql import functions as SF +from pyspark.sql.connect import functions as CF + +query = """ +SELECT * FROM VALUES +(ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), 1, 2, 'a'), +(ARRAY('x', NULL), NULL, ARRAY(1, 3), 3, 4, 'x'), +(NULL, ARRAY(-1, -2, -3), Array(), 5, 6, NULL) +AS tab(a, b, c, d, e, f) +""" +# +-+++---+---++ +# |a| b| c| d| e| f| +# +-+++---+---++ +# | [a, ab]| [1, 2, 3]|[1, null, 3]| 1| 2| a| +# |[x, null]|null| [1, 3]| 3| 4| x| +# | null|[-1, -2, -3]| []| 5| 6|null| +# +-+++---+---++ + +cdf = self.connect.sql(query) +sdf = self.spark.sql(query) + +for cfunc, sfunc in [ +(CF.array_distinct, SF.array_distinct), +(CF.array_max, SF.array_max), +(CF.array_min, SF.array_min), +]: +self.assert_eq( +cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(), +sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(), +) + +for cfunc, sfunc in [ +(CF.array_except, SF.array_except), +(CF.array_intersect, SF.array_intersect), +(CF.array_union, SF.array_union), +(CF.arrays_overlap, SF.arrays_overlap), +]: +self.assert_eq( +cdf.select(cfunc("b", cdf.c)).toPandas(), +sdf.select(sfunc("b", sdf.c)).toPandas(), +) + +for cfunc, sfunc in [ +(CF.array_position, SF.array_position), +(CF.array_remove, SF.array_remove), +]: +self.assert_eq( +cdf.select(cfunc(cdf.a, "ab")).toPandas(), +sdf.select(sfunc(sdf.a, "ab")).toPandas(), +) + +# test array +self.assert_eq( +cdf.select(CF.array(cdf.d, "e")).toPandas(), +sdf.select(SF.array(sdf.d, "e")).toPandas(), +) +self.assert_eq( +cdf.select(CF.array(cdf.d, "e", CF.lit(99))).toPandas(), +sdf.select(SF.array(sdf.d, "e", SF.lit(99))).toPandas(), +) + +# test array_contains +self.assert_eq( +cdf.select(CF.array_contains(cdf.a, "ab")).toPandas(), +sdf.select(SF.array_contains(sdf.a, "ab")).toPandas(), +) +self.assert_eq( +cdf.select(CF.array_contains(cdf.a, cdf.f)).toPandas(), +sdf.select(SF.array_contains(sdf.a, sdf.f)).toPandas(), +) + +# test array_join +self.assert_eq( +cdf.select( +CF.array_join(cdf.a, ","), CF.array_join("b", ":"), CF.array_join("c", "~") +).toPandas(), +sdf.select( +SF.array_join(sdf.a, ","), SF.array_join("b", ":"), SF.array_join("c", "~") +).toPandas(), +) +self.assert_eq( +cdf.select( +CF.array_join(cdf.a, ",", "_null_"), +CF.array_join("b", ":", ".null."), +CF.array_join("c", "~", "NULL"), +).toPandas(), +sdf.select( +SF.array_join(sdf.a, ",", "_null_"), +SF.array_join("b", ":", ".null."), +SF.array_join("c", "~", "NULL"), +).toPandas(), +) + +# test array_repeat +self.assert_eq( +cdf.select(CF.array_repeat(cdf.f, "d")).toPandas(), +sdf.select(SF.array_repeat(sdf.f, "d")).toPandas(), +) +self.assert_eq( +cdf.select(CF.array_repeat("f", cdf.d)).toPandas(), +sdf.select(SF.array_repeat("f", sdf.d)).toPandas(), +) +# TODO: Make Literal contains DataType +# Cannot resolve "array_repeat(f, 3)" due to data type mismatch: Review Comment: I think we may need to revisit https://github.com/apache/spark/pull/38800#discussion_r1033281998 at some point, we'd better specify the `Datatype` of `3` to IntegerType instead of LongType. or we can use `Cast` as a workaround when `Cast` is ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please con
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: It's not safe to just pretend that Kafka resource won't change during the query run. If it happens, it will end up with unexpected behavior e.g. with consumer based grouping, polling from non-exist topic partition may lead to timeout of metadata fetch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: It's not safe to just pretend that Kafka resource won't change during the query run. If it happens, it will end up with unexpected behavior e.g. consumer based grouping, polling from non-exist topic partition may lead to timeout of metadata fetch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38937: [SPARK-41406][SQL] Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
MaxGekk commented on code in PR #38937: URL: https://github.com/apache/spark/pull/38937#discussion_r1041863502 ## sql/core/src/test/resources/sql-tests/results/except-all.sql.out: ## @@ -230,10 +230,9 @@ org.apache.spark.sql.AnalysisException { "errorClass" : "NUM_COLUMNS_MISMATCH", "messageParameters" : { -"invalidNumColumns" : "2", -"invalidOrdinalNum" : "second", +"leftNumCols" : "1", "operator" : "EXCEPTALL", Review Comment: hmm, could you output it as `EXCEPT ALL` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on pull request #38898: [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
jerrypeng commented on PR #38898: URL: https://github.com/apache/spark/pull/38898#issuecomment-1340530153 @wecharyu can you run one batch and then delete all the partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic / partition is dropped, leading that it never reaches the end state (prepared offset). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic partition is dropped, leading that it never reaches the end state (prepared offset). Same for topic recreation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic partition is dropped, leading that it never reaches the end state (prepared offset). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
jerrypeng commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041856979 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet + +if (tpsForPrefetched != tpsForEndOffset) { + throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched( +tpsForPrefetched, tpsForEndOffset) +} + +val endOffsetHasGreaterThanPrefetched = { + allDataForTriggerAvailableNow.keySet.exists { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset > offsetFromPrefetched + } +} +if (endOffsetHasGreaterThanPrefetched) { + throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( +allDataForTriggerAvailableNow, endPartitionOffsets) +} + +val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets)) Review Comment: Why do we need to fetch the latest offset from Kafka again? This will add additional latency. I know its trigger available now but what does this buy us? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation
LuciferYang commented on PR #38874: URL: https://github.com/apache/spark/pull/38874#issuecomment-1340524315 > Thanks for reviewing this. @LuciferYang let me know when you think it's ready to go. @HyukjinKwon @zhengruifeng The Scala part is good to me, please further review, thanks ~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041856552 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { +"message" : [ + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ", Review Comment: We are guarding against arbitrary bugs so there is no 100% guarantee that this would only happen transiently, but I agree that restarting the query would mitigate the issue in most cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C
zhengruifeng commented on code in PR #38961: URL: https://github.com/apache/spark/pull/38961#discussion_r1041844991 ## python/pyspark/sql/tests/connect/test_connect_function.py: ## @@ -63,6 +63,24 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase): """These test cases exercise the interface to the proto plan generation but do not call Spark.""" +def compare_by_show(self, df1: Any, df2: Any): Review Comment: `toPandas` doesn't work correctly on some complex types like `array(struct)` `array(map)`, add this method to check by `DataFrame.show` for now... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
jerrypeng commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041848809 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { +"message" : [ + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ", + "topic-partitions for pre-fetched offset: , topic-partitions for end offset: ." +] + }, + "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : { +"message" : [ + "For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset.", + "pre-fetched offset: , end offset: ." +] + }, + "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : { +"message" : [ + "Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. Make sure topic partitions are not dropped during the query run.", Review Comment: Same here. Transient error right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
jerrypeng commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041848355 ## connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json: ## @@ -0,0 +1,26 @@ +{ + "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : { +"message" : [ + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ", Review Comment: This should be a transient issue right? As in when the job is triggered again, the job should run fine right and number the job will read from whichever partitions exist? Can you add that to this error message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C
zhengruifeng commented on code in PR #38961: URL: https://github.com/apache/spark/pull/38961#discussion_r1041847513 ## python/pyspark/sql/tests/connect/test_connect_function.py: ## @@ -413,6 +431,144 @@ def test_aggregation_functions(self): sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 0.9])).toPandas(), ) +def test_collection_functions(self): +from pyspark.sql import functions as SF +from pyspark.sql.connect import functions as CF + +query = """ +SELECT * FROM VALUES +(ARRAY('a', 'ab'), ARRAY(1, 2, 3), ARRAY(1, NULL, 3), 1, 2, 'a'), +(ARRAY('x', NULL), NULL, ARRAY(1, 3), 3, 4, 'x'), +(NULL, ARRAY(-1, -2, -3), Array(), 5, 6, NULL) +AS tab(a, b, c, d, e, f) +""" +# +-+++---+---++ +# |a| b| c| d| e| f| +# +-+++---+---++ +# | [a, ab]| [1, 2, 3]|[1, null, 3]| 1| 2| a| +# |[x, null]|null| [1, 3]| 3| 4| x| +# | null|[-1, -2, -3]| []| 5| 6|null| +# +-+++---+---++ + +cdf = self.connect.sql(query) +sdf = self.spark.sql(query) + +for cfunc, sfunc in [ +(CF.array_distinct, SF.array_distinct), +(CF.array_max, SF.array_max), +(CF.array_min, SF.array_min), +]: +self.assert_eq( +cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(), +sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(), +) + +for cfunc, sfunc in [ +(CF.array_except, SF.array_except), +(CF.array_intersect, SF.array_intersect), +(CF.array_union, SF.array_union), +(CF.arrays_overlap, SF.arrays_overlap), +]: +self.assert_eq( +cdf.select(cfunc("b", cdf.c)).toPandas(), +sdf.select(sfunc("b", sdf.c)).toPandas(), +) + +for cfunc, sfunc in [ +(CF.array_position, SF.array_position), +(CF.array_remove, SF.array_remove), +]: +self.assert_eq( +cdf.select(cfunc(cdf.a, "ab")).toPandas(), +sdf.select(sfunc(sdf.a, "ab")).toPandas(), +) + +# test array +self.assert_eq( +cdf.select(CF.array(cdf.d, "e")).toPandas(), +sdf.select(SF.array(sdf.d, "e")).toPandas(), +) +self.assert_eq( +cdf.select(CF.array(cdf.d, "e", CF.lit(99))).toPandas(), +sdf.select(SF.array(sdf.d, "e", SF.lit(99))).toPandas(), +) + +# test array_contains +self.assert_eq( +cdf.select(CF.array_contains(cdf.a, "ab")).toPandas(), +sdf.select(SF.array_contains(sdf.a, "ab")).toPandas(), +) +self.assert_eq( +cdf.select(CF.array_contains(cdf.a, cdf.f)).toPandas(), +sdf.select(SF.array_contains(sdf.a, sdf.f)).toPandas(), +) + +# test array_join +self.assert_eq( +cdf.select( +CF.array_join(cdf.a, ","), CF.array_join("b", ":"), CF.array_join("c", "~") +).toPandas(), +sdf.select( +SF.array_join(sdf.a, ","), SF.array_join("b", ":"), SF.array_join("c", "~") +).toPandas(), +) +self.assert_eq( +cdf.select( +CF.array_join(cdf.a, ",", "_null_"), +CF.array_join("b", ":", ".null."), +CF.array_join("c", "~", "NULL"), +).toPandas(), +sdf.select( +SF.array_join(sdf.a, ",", "_null_"), +SF.array_join("b", ":", ".null."), +SF.array_join("c", "~", "NULL"), +).toPandas(), +) + +# test array_repeat +self.assert_eq( +cdf.select(CF.array_repeat(cdf.f, "d")).toPandas(), +sdf.select(SF.array_repeat(sdf.f, "d")).toPandas(), +) +self.assert_eq( +cdf.select(CF.array_repeat("f", cdf.d)).toPandas(), +sdf.select(SF.array_repeat("f", sdf.d)).toPandas(), +) +# TODO: Make Literal contains DataType +# Cannot resolve "array_repeat(f, 3)" due to data type mismatch: Review Comment: I think we may need to revisit https://github.com/apache/spark/pull/38800#discussion_r1033281998 at some point, we'd better specify the `Datatype` of `3` to IntegerType instead of LongType. or we can use `Cast` as a workaround. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure a
[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2
huaxingao commented on code in PR #38904: URL: https://github.com/apache/spark/pull/38904#discussion_r1041841165 ## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala: ## @@ -294,7 +313,30 @@ abstract class InMemoryBaseTable( val objectHeaderSizeInBytes = 12L val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize val sizeInBytes = numRows * rowSizeInBytes - InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows)) + + val map = new util.HashMap[NamedReference, ColumnStatistics]() + val colNames = readSchema.fields.map(_.name) + for (col <- colNames) { +val fieldReference = FieldReference(col) +// put some fake data for testing only +val bin1 = InMemoryHistogramBin(1, 2, 5L) +val bin2 = InMemoryHistogramBin(3, 4, 5L) +val bin3 = InMemoryHistogramBin(5, 6, 5L) +val bin4 = InMemoryHistogramBin(7, 8, 5L) +val bin5 = InMemoryHistogramBin(9, 10, 5L) Review Comment: I removed the fake data and computed NDV and null Count for testing purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2
huaxingao commented on PR #38904: URL: https://github.com/apache/spark/pull/38904#issuecomment-1340511363 > Also curious how this is to be used by Spark The newly added `ColumnStatistics` is converted to logical `ColumnStat` in this [method](https://github.com/apache/spark/blob/0cddab9a618dc185efc2424ea934af5aa565a213/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala#L213) and is used in CBO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C
zhengruifeng commented on code in PR #38961: URL: https://github.com/apache/spark/pull/38961#discussion_r1041844991 ## python/pyspark/sql/tests/connect/test_connect_function.py: ## @@ -63,6 +63,24 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase): """These test cases exercise the interface to the proto plan generation but do not call Spark.""" +def compare_by_show(self, df1: Any, df2: Any): Review Comment: `toPandas` doesn't work correctly on some complex types like `array(struct)` `array(map)`, add this method to check by `DataFrame.show` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation
LuciferYang commented on code in PR #38874: URL: https://github.com/apache/spark/pull/38874#discussion_r1041845222 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,51 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +@ExpressionDescription( + usage = "_FUNC_(array) - Removes null values from the array.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3, null)); + [1,2,3] + """, + group = "array_funcs", + since = "3.4.0") +case class ArrayCompact(child: Expression) + extends UnaryExpression with ExpectsInputTypes with NullIntolerant { + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + override def dataType: DataType = child.dataType + + @transient private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType + override def checkInputDataTypes(): TypeCheckResult = { +super.checkInputDataTypes() match { + case f if f.isFailure => f + case TypeCheckResult.TypeCheckSuccess => +TypeUtils.checkForOrderingExpr(elementType, prettyName) +} + } + + override def nullSafeEval(array: Any): Any = { +val newArray = new Array[Any](array.asInstanceOf[ArrayData].numElements()) Review Comment: I think `Array` is ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C
zhengruifeng commented on code in PR #38961: URL: https://github.com/apache/spark/pull/38961#discussion_r1041844991 ## python/pyspark/sql/tests/connect/test_connect_function.py: ## @@ -63,6 +63,24 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase): """These test cases exercise the interface to the proto plan generation but do not call Spark.""" +def compare_by_show(self, df1: Any, df2: Any): Review Comment: toPandas doesn't work correctly on some complex types like `array(struct)` `array(map)`, add this method to check by `DataFrame.show` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2
huaxingao commented on code in PR #38904: URL: https://github.com/apache/spark/pull/38904#discussion_r1041841092 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Statistics.java: ## @@ -31,4 +35,7 @@ public interface Statistics { OptionalLong sizeInBytes(); OptionalLong numRows(); + default Optional> columnStats() { Review Comment: Changed. Thanks ## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala: ## @@ -294,7 +313,30 @@ abstract class InMemoryBaseTable( val objectHeaderSizeInBytes = 12L val rowSizeInBytes = objectHeaderSizeInBytes + schema.defaultSize val sizeInBytes = numRows * rowSizeInBytes - InMemoryStats(OptionalLong.of(sizeInBytes), OptionalLong.of(numRows)) + + val map = new util.HashMap[NamedReference, ColumnStatistics]() + val colNames = readSchema.fields.map(_.name) + for (col <- colNames) { +val fieldReference = FieldReference(col) +// put some fake data for testing only +val bin1 = InMemoryHistogramBin(1, 2, 5L) +val bin2 = InMemoryHistogramBin(3, 4, 5L) +val bin3 = InMemoryHistogramBin(5, 6, 5L) +val bin4 = InMemoryHistogramBin(7, 8, 5L) +val bin5 = InMemoryHistogramBin(9, 10, 5L) Review Comment: I computed NDV and null Count for testing purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2
huaxingao commented on code in PR #38904: URL: https://github.com/apache/spark/pull/38904#discussion_r1041840929 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; Review Comment: The classes inside this package are for Column Stats. There is one existing class `Statistics`, I can't group it in the new package. It's probably better to call `colstats`. ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/Histogram.java: ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; + +import org.apache.spark.annotation.Evolving; + +/** + * An interface to represent an equi-height histogram, which is a part of + * {@link ColumnStatistics}. Equi-height histogram represents the distribution of + * a column's values by a sequence of bins. + * + * @since 3.4.0 + */ +@Evolving +public interface Histogram { + double height(); Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2
huaxingao commented on code in PR #38904: URL: https://github.com/apache/spark/pull/38904#discussion_r1041840770 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; + +import org.apache.spark.annotation.Evolving; +import java.math.BigInteger; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * An interface to represent column statistics, which is part of + * {@link Statistics}. + * + * @since 3.4.0 + */ +@Evolving +public interface ColumnStatistics { + default Optional distinctCount() { +return Optional.empty(); + } + + default Optional min() { +return Optional.empty(); + } + + default Optional max() { +return Optional.empty(); + } + + default Optional nullCount() { +return Optional.empty(); + } + + default OptionalLong avgLen() { Review Comment: Comments added. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #38961: [SPARK-41436][CONNECT][PYTHON] Implement `collection` functions: A~C
zhengruifeng opened a new pull request, #38961: URL: https://github.com/apache/spark/pull/38961 ### What changes were proposed in this pull request? Implement `collection` functions alphabetically, this PR contains `A` ~ `C` except: - aggregate, array_sort - need the support of LambdaFunction Expression ### Why are the changes needed? For API coverage ### Does this PR introduce _any_ user-facing change? new APIs ### How was this patch tested? added UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on a diff in pull request #38904: [SPARK-41378][SQL] Support Column Stats in DS v2
huaxingao commented on code in PR #38904: URL: https://github.com/apache/spark/pull/38904#discussion_r1041840529 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/colstats/ColumnStatistics.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.colstats; + +import org.apache.spark.annotation.Evolving; +import java.math.BigInteger; +import java.util.Optional; +import java.util.OptionalLong; Review Comment: Fixed. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #38960: [SPARK-41435][SQL] Make `curdate()` throw `WRONG_NUM_ARGS ` when args is not null
LuciferYang opened a new pull request, #38960: URL: https://github.com/apache/spark/pull/38960 ### What changes were proposed in this pull request? `curdate()` throw `QueryCompilationErrors.invalidFunctionArgumentNumberError` with `Seq.empty` input when `expressions` is not empty, then the error class will be `_LEGACY_ERROR_TEMP_1043 `: ```json "_LEGACY_ERROR_TEMP_1043" : { "message" : [ "Invalid arguments for function ." ] } ``` this pr change the first input arg from `Seq.empty` to `Seq(0)` to make the error class use `WRONG_NUM_ARGS`: ```json "WRONG_NUM_ARGS" : { "message" : [ "The requires parameters but the actual number is ." ] } ``` ### Why are the changes needed? `WRONG_NUM_ARGS` is a more appropriate error class ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38958: [SPARK-41433][CONNECT] Make Max Arrow BatchSize configurable
zhengruifeng commented on PR #38958: URL: https://github.com/apache/spark/pull/38958#issuecomment-1340494395 cc @grundprinzip @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions
zhengruifeng commented on PR #38914: URL: https://github.com/apache/spark/pull/38914#issuecomment-1340493981 also cc @cloud-fan @grundprinzip -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function
LuciferYang commented on code in PR #38865: URL: https://github.com/apache/spark/pull/38865#discussion_r1041827973 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +/** + * Given an array, and another element append the element at the end of the array. + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Append the element", + examples = +""" +Examples: + > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd'); + ["b","d","c","a","d"] + + """, + since = "3.4.0", + group = "array_funcs") +case class ArrayAppend(left: Expression, right: Expression) + extends BinaryExpression + with ImplicitCastInputTypes + with ComplexTypeMergingExpression + with QueryErrorsBase { + override def prettyName: String = "array_append" + + override def inputTypes: Seq[AbstractDataType] = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, hasNull), e2) => +TypeCoercion.findTightestCommonType(e1, e2) match { + case Some(dt) => Seq(ArrayType(dt, hasNull), dt) + case _ => Seq.empty +} + case _ => Seq.empty +} + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +val value2 = right.eval(input) +if (value1 == null) { + null +} else { + nullSafeEval(value1, value2) +} + } + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) { +TypeCheckResult.TypeCheckSuccess + } + else { +DataTypeMismatch( + errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"leftType" -> toSQLType(left.dataType), +"rightType" -> toSQLType(right.dataType), +"dataType" -> toSQLType(ArrayType) + )) + } + case _ => +DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( +"paramIndex" -> "0", +"requiredType" -> toSQLType(ArrayType), +"inputSql" -> toSQLExpr(left), +"inputType" -> toSQLType(left.dataType) + ) +) +} + } + + protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ArrayAppend = +copy(left = newLeft, right = newRight) + + override protected def nullSafeEval(input1: Any, input2: Any): Any = { +val arrayData = input1.asInstanceOf[ArrayData] +val arrayElementType = dataType.asInstanceOf[ArrayType].elementType +val elementData = input2 +val numberOfElements = arrayData.numElements() + 1 +if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements) +} +val finalData = new Array[Any](numberOfElements) +arrayData.foreach(arrayElementType, finalData.update) +finalData.update(numberOfElements - 1, elementData) +new GenericArrayData(finalData) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val f = (left: String, right: String) => { + val expr = ctx.addReferenceObj("arraysAppendExpr", this) + s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);" +} + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) Review Comment: WDYT @zhengruifeng ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function
LuciferYang commented on code in PR #38865: URL: https://github.com/apache/spark/pull/38865#discussion_r1041827496 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +/** + * Given an array, and another element append the element at the end of the array. + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Append the element", + examples = +""" +Examples: + > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd'); + ["b","d","c","a","d"] + + """, + since = "3.4.0", + group = "array_funcs") +case class ArrayAppend(left: Expression, right: Expression) + extends BinaryExpression + with ImplicitCastInputTypes + with ComplexTypeMergingExpression + with QueryErrorsBase { + override def prettyName: String = "array_append" + + override def inputTypes: Seq[AbstractDataType] = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, hasNull), e2) => +TypeCoercion.findTightestCommonType(e1, e2) match { + case Some(dt) => Seq(ArrayType(dt, hasNull), dt) + case _ => Seq.empty +} + case _ => Seq.empty +} + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +val value2 = right.eval(input) +if (value1 == null) { + null +} else { + nullSafeEval(value1, value2) +} + } + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) { +TypeCheckResult.TypeCheckSuccess + } + else { +DataTypeMismatch( + errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"leftType" -> toSQLType(left.dataType), +"rightType" -> toSQLType(right.dataType), +"dataType" -> toSQLType(ArrayType) + )) + } + case _ => +DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( +"paramIndex" -> "0", +"requiredType" -> toSQLType(ArrayType), +"inputSql" -> toSQLExpr(left), +"inputType" -> toSQLType(left.dataType) + ) +) +} + } + + protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ArrayAppend = +copy(left = newLeft, right = newRight) + + override protected def nullSafeEval(input1: Any, input2: Any): Any = { +val arrayData = input1.asInstanceOf[ArrayData] +val arrayElementType = dataType.asInstanceOf[ArrayType].elementType +val elementData = input2 +val numberOfElements = arrayData.numElements() + 1 +if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements) +} +val finalData = new Array[Any](numberOfElements) +arrayData.foreach(arrayElementType, finalData.update) +finalData.update(numberOfElements - 1, elementData) +new GenericArrayData(finalData) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val f = (left: String, right: String) => { + val expr = ctx.addReferenceObj("arraysAppendExpr", this) + s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);" +} + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) Review Comment: Personally, I think we should `override this behaviour` to append `null` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] infoankitp commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function
infoankitp commented on code in PR #38865: URL: https://github.com/apache/spark/pull/38865#discussion_r1041824037 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +/** + * Given an array, and another element append the element at the end of the array. + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Append the element", + examples = +""" +Examples: + > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd'); + ["b","d","c","a","d"] + + """, + since = "3.4.0", + group = "array_funcs") +case class ArrayAppend(left: Expression, right: Expression) + extends BinaryExpression + with ImplicitCastInputTypes + with ComplexTypeMergingExpression + with QueryErrorsBase { + override def prettyName: String = "array_append" + + override def inputTypes: Seq[AbstractDataType] = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, hasNull), e2) => +TypeCoercion.findTightestCommonType(e1, e2) match { + case Some(dt) => Seq(ArrayType(dt, hasNull), dt) + case _ => Seq.empty +} + case _ => Seq.empty +} + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +val value2 = right.eval(input) +if (value1 == null) { + null +} else { + nullSafeEval(value1, value2) +} + } + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) { +TypeCheckResult.TypeCheckSuccess + } + else { +DataTypeMismatch( + errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"leftType" -> toSQLType(left.dataType), +"rightType" -> toSQLType(right.dataType), +"dataType" -> toSQLType(ArrayType) + )) + } + case _ => +DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( +"paramIndex" -> "0", +"requiredType" -> toSQLType(ArrayType), +"inputSql" -> toSQLExpr(left), +"inputType" -> toSQLType(left.dataType) + ) +) +} + } + + protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ArrayAppend = +copy(left = newLeft, right = newRight) + + override protected def nullSafeEval(input1: Any, input2: Any): Any = { +val arrayData = input1.asInstanceOf[ArrayData] +val arrayElementType = dataType.asInstanceOf[ArrayType].elementType +val elementData = input2 +val numberOfElements = arrayData.numElements() + 1 +if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements) +} +val finalData = new Array[Any](numberOfElements) +arrayData.foreach(arrayElementType, finalData.update) +finalData.update(numberOfElements - 1, elementData) +new GenericArrayData(finalData) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val f = (left: String, right: String) => { + val expr = ctx.addReferenceObj("arraysAppendExpr", this) + s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);" +} + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) Review Comment: Nope the elements inside the src Array are not being translated to -1. ``` val df10 = spark.createDataFrame( spark.sparkContext.parallelize( Seq(Row(Seq[Integer](1, 2, 3, null), null))), StructType(List( StructField("a", ArrayType.apply(IntegerType), true), StructField("b", IntegerType, true) )) ) df10.selectExpr("array_append(a, b)").printSchema() checkAnswer(df10.selectExpr("array_append(a, b)"), Seq(Row(Seq(1, 2, 3, null, null))) ) ``` Output ``` == Results == !== Correct Answer - 1 == == Spark Answer - 1 == !struct<> struct> ![List(1, 2, 3, null, null)] [ArrayBuffer(1, 2, 3, null, -1)] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-uns
[GitHub] [spark] beliefer commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
beliefer commented on code in PR #38672: URL: https://github.com/apache/spark/pull/38672#discussion_r1041818122 ## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/LikeAnyBenchmark.scala: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import scala.util.Random + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure like any expressions performance. + * + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * --jars , + * 2. build/sbt "sql/Test/runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results will be written to "benchmarks/LikeAnyBenchmark-results.txt". + * }}} + */ +object LikeAnyBenchmark extends SqlBasedBenchmark { Review Comment: It means regression ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] navinvishy commented on a diff in pull request #38947: [SPARK-41231][SQL] Adds an array_prepend function to catalyst
navinvishy commented on code in PR #38947: URL: https://github.com/apache/spark/pull/38947#discussion_r1041817673 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -119,21 +117,24 @@ case class Size(child: Expression, legacySizeOfNull: Boolean) val value = child.eval(input) if (value == null) { if (legacySizeOfNull) -1 else null -} else child.dataType match { - case _: ArrayType => value.asInstanceOf[ArrayData].numElements() - case _: MapType => value.asInstanceOf[MapData].numElements() - case other => throw QueryExecutionErrors.unsupportedOperandTypeForSizeFunctionError(other) -} +} else Review Comment: Yeah there were too many of those. I've removed them now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
wankunde commented on code in PR #38672: URL: https://github.com/apache/spark/pull/38672#discussion_r1041811602 ## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/LikeAnyBenchmark.scala: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import scala.util.Random + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure like any expressions performance. + * + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * --jars , + * 2. build/sbt "sql/Test/runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results will be written to "benchmarks/LikeAnyBenchmark-results.txt". + * }}} + */ +object LikeAnyBenchmark extends SqlBasedBenchmark { Review Comment: Before this PR: ``` [info] Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz [info] Multi like query: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative [info] [info] Query with multi like 1393 1469 119 0.0 1392586.7 1.0X [info] Query with LikeAny simplification 1244 1309 97 0.0 1244382.5 1.1X [info] Query without LikeAny simplification400 407 8 0.0 399924.3 3.5X [info] Multi like query: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative [info] [info] Query with multi like 1476 1576 149 0.0 1475710.1 1.0X [info] Query with LikeAny simplification 1387 1429 37 0.0 1386669.1 1.1X [info] Query without LikeAny simplification430 470 35 0.0 430435.8 3.4X ``` After this PR: ``` [info] Multi like query: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative [info] [info] Query with multi like 1441 1516 78 0.0 1441335.8 1.0X [info] Query with LikeAny simplification 1401 1431 44 0.0 1400743.9 1.0X [info] Query without LikeAny simplification357 369 10 0.0 357419.8 4.0X [info] Multi like query: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative [info] [info] Query with multi like 1524 1628 117 0.0 1524119.6 1.0X [info] Query with LikeAny simplification 1405 1418 18 0.0 1405258.7 1.1X [info] Query without LikeAny simplification362 372 12 0.0 361654.4 4.2X ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional com
[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
wineternity commented on PR #38702: URL: https://github.com/apache/spark/pull/38702#issuecomment-1340463701 > The change looks good to me. +CC @Ngone51 > > Btw, do you also want to remove the `if (event.taskInfo == null) {` check in beginning of `onTaskEnd` ? > > Make it a precondition check ? `Preconditions.checkNotNull(event.taskInfo)` Yes, it can be change to a precondition check. Maybe I can change it in a new pr after test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38940: [WIP][SPARK-41409][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_1043` to `INVALID_FUNCTION_ARGS`
LuciferYang commented on code in PR #38940: URL: https://github.com/apache/spark/pull/38940#discussion_r1041802953 ## sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala: ## @@ -638,10 +638,16 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-28521 error message for CAST(parameter types contains DataType)") { -val e = intercept[AnalysisException] { - spark.sql("SELECT CAST(1)") -} -assert(e.getMessage.contains("Invalid arguments for function cast")) +checkError( + exception = intercept[AnalysisException] { +sql("SELECT CAST(1)") Review Comment: Do you have any suggestions on the calculation way of `validParametersCount` @MaxGekk ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38940: [WIP][SPARK-41409][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_1043` to `INVALID_FUNCTION_ARGS`
LuciferYang commented on code in PR #38940: URL: https://github.com/apache/spark/pull/38940#discussion_r1041802953 ## sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala: ## @@ -638,10 +638,16 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-28521 error message for CAST(parameter types contains DataType)") { -val e = intercept[AnalysisException] { - spark.sql("SELECT CAST(1)") -} -assert(e.getMessage.contains("Invalid arguments for function cast")) +checkError( + exception = intercept[AnalysisException] { +sql("SELECT CAST(1)") Review Comment: Do you have any suggestions on the calculation way of `validParametersCount `? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match
sunchao commented on code in PR #38924: URL: https://github.com/apache/spark/pull/38924#discussion_r1041789809 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -114,8 +117,21 @@ case class BatchScanExec( // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { + var finalPartitions = filteredPartitions + + outputPartitioning match { +case p: KeyGroupedPartitioning => + val partitionMapping = finalPartitions.map(s => +s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap + finalPartitions = p.partitionValuesOpt.get.map { partKey => Review Comment: I was hoping to re-use the logic code in #38950 too, which is not related to runtime filtering. If moving this close to the `if (oldRows.size < newRows.size)` check then I may have to duplicate it somewhere else? ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -114,8 +117,21 @@ case class BatchScanExec( // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { + var finalPartitions = filteredPartitions + + outputPartitioning match { +case p: KeyGroupedPartitioning => + val partitionMapping = finalPartitions.map(s => +s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap + finalPartitions = p.partitionValuesOpt.get.map { partKey => Review Comment: I was hoping to re-use the logic in #38950 too, which is not related to runtime filtering. If moving this close to the `if (oldRows.size < newRows.size)` check then I may have to duplicate it somewhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on pull request #38959: [WIP] SPARK-41415: SASL Request Retries
akpatnam25 commented on PR #38959: URL: https://github.com/apache/spark/pull/38959#issuecomment-1340410484 cc @mridulm @otterc @zhouyejoe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 opened a new pull request, #38959: [WIP] SPARK-41415: SASL Request Retries
akpatnam25 opened a new pull request, #38959: URL: https://github.com/apache/spark/pull/38959 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #38937: [SPARK-41406][SQL] Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
panbingkun commented on PR #38937: URL: https://github.com/apache/spark/pull/38937#issuecomment-1340399634 cc @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1041779007 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala: ## @@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream( } } + private def assertEndOffsetForTriggerAvailableNow( + endPartitionOffsets: Map[TopicPartition, Long]): Unit = { +val tpsForPrefetched = allDataForTriggerAvailableNow.keySet +val tpsForEndOffset = endPartitionOffsets.keySet +assert(tpsForPrefetched == tpsForEndOffset, + "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " + +"pre-fetched offset to end offset for each microbatch. " + +s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " + +s"topic-partitions for end offset: $tpsForEndOffset.") + +val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = { + allDataForTriggerAvailableNow.keySet.forall { tp => +val offsetFromPrefetched = allDataForTriggerAvailableNow(tp) +val offsetFromEndOffset = endPartitionOffsets(tp) +offsetFromEndOffset <= offsetFromPrefetched + } +} +assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched, Review Comment: I just dealt with error class framework (separate one for Kafka data source) as well as making errors to non-internal (user-facing). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38940: [WIP][SPARK-41409][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_1043` to `INVALID_FUNCTION_ARGS`
LuciferYang commented on code in PR #38940: URL: https://github.com/apache/spark/pull/38940#discussion_r1041778431 ## sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala: ## @@ -638,10 +638,16 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-28521 error message for CAST(parameter types contains DataType)") { -val e = intercept[AnalysisException] { - spark.sql("SELECT CAST(1)") -} -assert(e.getMessage.contains("Invalid arguments for function cast")) +checkError( + exception = intercept[AnalysisException] { +sql("SELECT CAST(1)") Review Comment: https://github.com/apache/spark/blob/37453ad5c85122d11b436813b4f5eddf615a6cf8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L137-L144 In this scenario, the `validParametersCount` is also empty, so `WRONG_NUM_ARGS ` cannot be reused now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match
cloud-fan commented on code in PR #38924: URL: https://github.com/apache/spark/pull/38924#discussion_r1041775153 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -114,8 +117,21 @@ case class BatchScanExec( // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { + var finalPartitions = filteredPartitions + + outputPartitioning match { +case p: KeyGroupedPartitioning => + val partitionMapping = finalPartitions.map(s => +s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap + finalPartitions = p.partitionValuesOpt.get.map { partKey => Review Comment: It's better to centralize the related code. This empty filling is quite related to the `if (oldRows.size < newRows.size)` check. We should put them together if possible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation
HyukjinKwon commented on code in PR #38874: URL: https://github.com/apache/spark/pull/38874#discussion_r1041775256 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,51 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +@ExpressionDescription( + usage = "_FUNC_(array) - Removes null values from the array.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3, null)); + [1,2,3] + """, + group = "array_funcs", + since = "3.4.0") +case class ArrayCompact(child: Expression) Review Comment: +1 for @zhengruifeng for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match
cloud-fan commented on code in PR #38924: URL: https://github.com/apache/spark/pull/38924#discussion_r1041775153 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -114,8 +117,21 @@ case class BatchScanExec( // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { + var finalPartitions = filteredPartitions + + outputPartitioning match { +case p: KeyGroupedPartitioning => + val partitionMapping = finalPartitions.map(s => +s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap + finalPartitions = p.partitionValuesOpt.get.map { partKey => Review Comment: It's better to centralize the related code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name
HyukjinKwon closed pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name URL: https://github.com/apache/spark/pull/38957 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name
HyukjinKwon commented on PR #38957: URL: https://github.com/apache/spark/pull/38957#issuecomment-1340392569 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sandeep-katta commented on a diff in pull request #38874: [SPARK-41235][SQL][PYTHON]High-order function: array_compact implementation
sandeep-katta commented on code in PR #38874: URL: https://github.com/apache/spark/pull/38874#discussion_r1041772280 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,51 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +@ExpressionDescription( + usage = "_FUNC_(array) - Removes null values from the array.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3, null)); + [1,2,3] + """, + group = "array_funcs", + since = "3.4.0") +case class ArrayCompact(child: Expression) + extends UnaryExpression with ExpectsInputTypes with NullIntolerant { + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + override def dataType: DataType = child.dataType + + @transient private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType + override def checkInputDataTypes(): TypeCheckResult = { +super.checkInputDataTypes() match { + case f if f.isFailure => f + case TypeCheckResult.TypeCheckSuccess => +TypeUtils.checkForOrderingExpr(elementType, prettyName) +} + } + + override def nullSafeEval(array: Any): Any = { +val newArray = new Array[Any](array.asInstanceOf[ArrayData].numElements()) Review Comment: I am slicing the array at the end like `new GenericArrayData(newArray.slice(0, pos))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38937: [SPARK-41406][SQL] Refactor error message for `NUM_COLUMNS_MISMATCH` to make it more generic
AmplabJenkins commented on PR #38937: URL: https://github.com/apache/spark/pull/38937#issuecomment-1340389026 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
beliefer commented on code in PR #38672: URL: https://github.com/apache/spark/pull/38672#discussion_r1041767560 ## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/LikeAnyBenchmark.scala: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import scala.util.Random + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure like any expressions performance. + * + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * --jars , + * 2. build/sbt "sql/Test/runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results will be written to "benchmarks/LikeAnyBenchmark-results.txt". + * }}} + */ +object LikeAnyBenchmark extends SqlBasedBenchmark { Review Comment: The benchmark cannot prove the performance improvement. Could you test with or without `MatchMultiHelper` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
beliefer commented on code in PR #38672: URL: https://github.com/apache/spark/pull/38672#discussion_r1041765598 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala: ## @@ -248,13 +248,90 @@ case class ILike( } } +case class MatchMultiHelper( +isNotSpecified: Boolean, +partialMatch: Boolean, +patterns: Seq[UTF8String]) { Review Comment: `LikeSimplification` have the similar optimization. Why need this class ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #38958: [SPARK-41433][CONNECT] Make Max Arrow BatchSize configurable
zhengruifeng opened a new pull request, #38958: URL: https://github.com/apache/spark/pull/38958 ### What changes were proposed in this pull request? Make Max Arrow BatchSize configurable ### Why are the changes needed? make batchsize configurable ### Does this PR introduce _any_ user-facing change? yes, one new configration ### How was this patch tested? existing tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia closed pull request #38908: [SPARK-41384][CONNECT] Should use SQLExpression for str arguments in Projection
amaliujia closed pull request #38908: [SPARK-41384][CONNECT] Should use SQLExpression for str arguments in Projection URL: https://github.com/apache/spark/pull/38908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38899: [SPARK-41349][CONNECT] Implement DataFrame.hint
amaliujia commented on code in PR #38899: URL: https://github.com/apache/spark/pull/38899#discussion_r1041759325 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.planner + +import scala.collection.JavaConverters._ + +import org.apache.spark.connect.proto +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, CreateStruct} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +object LiteralValueProtoConverter { + + /** + * Transforms the protocol buffers literals into the appropriate Catalyst literal expression. + * + * @return + * Expression Review Comment: hmmm we actually enforce scalafmt for Spark Connect code. So if this pass scalafmt that is fine. Only Spark Connect enforce that so if you are seeing this is different from other modules, that is expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38899: [SPARK-41349][CONNECT] Implement DataFrame.hint
amaliujia commented on code in PR #38899: URL: https://github.com/apache/spark/pull/38899#discussion_r1041759325 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.planner + +import scala.collection.JavaConverters._ + +import org.apache.spark.connect.proto +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, CreateStruct} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +object LiteralValueProtoConverter { + + /** + * Transforms the protocol buffers literals into the appropriate Catalyst literal expression. + * + * @return + * Expression Review Comment: hmmm we actually enable scalafmt for Spark Connect code. So if this pass scalafmt that is fine. Only Spark Connect enforce that so if you are seeing this is different from other modules, that is expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #38899: [SPARK-41349][CONNECT] Implement DataFrame.hint
beliefer commented on code in PR #38899: URL: https://github.com/apache/spark/pull/38899#discussion_r1041753682 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala: ## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.planner + +import scala.collection.JavaConverters._ + +import org.apache.spark.connect.proto +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, CreateStruct} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +object LiteralValueProtoConverter { + + /** + * Transforms the protocol buffers literals into the appropriate Catalyst literal expression. + * + * @return + * Expression Review Comment: Yeah. But it seems strange. cc @amaliujia -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side read metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1041749200 ## core/src/test/resources/HistoryServerExpectations/excludeOnFailure_node_for_stage_expectation.json: ## @@ -81,7 +93,19 @@ "remoteBytesRead" : 0, "remoteBytesReadToDisk" : 0, "localBytesRead" : 0, - "recordsRead" : 0 + "recordsRead" : 0, + "pushBased": { Review Comment: Yes, that would be better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic
mridulm commented on code in PR #38711: URL: https://github.com/apache/spark/pull/38711#discussion_r1041745719 ## core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala: ## @@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe @DeveloperApi case class SparkListenerSpeculativeTaskSubmitted( Review Comment: Since we are making an API change, given SPARK-37831, we should add `partitionId` as well. We can use either of them in `ExecutorAllocationManager` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #38711: [SPARK-41192][Core] Remove unscheduled speculative tasks when task finished to obtain better dynamic
mridulm commented on code in PR #38711: URL: https://github.com/apache/spark/pull/38711#discussion_r1041745719 ## core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala: ## @@ -55,7 +55,8 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe @DeveloperApi case class SparkListenerSpeculativeTaskSubmitted( Review Comment: Since we are making an API change, given SPARK-37831, we should add `partitionId` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match
sunchao commented on code in PR #38924: URL: https://github.com/apache/spark/pull/38924#discussion_r1041745736 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -114,8 +117,21 @@ case class BatchScanExec( // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { + var finalPartitions = filteredPartitions + + outputPartitioning match { +case p: KeyGroupedPartitioning => + val partitionMapping = finalPartitions.map(s => +s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap + finalPartitions = p.partitionValuesOpt.get.map { partKey => Review Comment: Hmm any obvious advantage of this? It looks the same to me 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name
amaliujia commented on PR #38957: URL: https://github.com/apache/spark/pull/38957#issuecomment-1340358549 @HyukjinKwon @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia opened a new pull request, #38957: [SPARK-41369][CONNECT][BUILD][FOLLOW-UP] Update connect server module name
amaliujia opened a new pull request, #38957: URL: https://github.com/apache/spark/pull/38957 ### What changes were proposed in this pull request? The current maven package is not showing connect server as the name: https://user-images.githubusercontent.com/1938382/206088497-1e3af988-d6cb-473d-9e99-23f4827349ef.png";> This PR changes the name to match that it is a server module now. ### Why are the changes needed? Showing correct name when run maven package. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38933: [SPARK-41404][SQL][TESTS] Refactor `ColumnVectorUtils#toBatch` to make `ColumnarBatchSuite#testRandomRows` test more dataType
LuciferYang commented on code in PR #38933: URL: https://github.com/apache/spark/pull/38933#discussion_r1041743714 ## sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java: ## @@ -165,7 +171,17 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o) dst.getChild(1).appendInt(c.days); dst.getChild(2).appendLong(c.microseconds); } else if (t instanceof DateType) { -dst.appendInt(DateTimeUtils.fromJavaDate((Date)o)); +if (o instanceof Date) { Review Comment: OK, change to only test `Date` and `Timestamp` due they are default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38921: [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions
zhengruifeng commented on PR #38921: URL: https://github.com/apache/spark/pull/38921#issuecomment-1340345754 merged into master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #38921: [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions
zhengruifeng closed pull request #38921: [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions URL: https://github.com/apache/spark/pull/38921 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde commented on pull request #38672: [SPARK-41159][SQL] Optimize like any and like all expressions
wankunde commented on PR #38672: URL: https://github.com/apache/spark/pull/38672#issuecomment-1340342118 Hi, @beliefer @cloud-fan @wangyum Could you help to review this PR? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tedyu commented on pull request #38948: [SPARK-41419][K8S] Decrement PVC_COUNTER when the pod deletion happens
tedyu commented on PR #38948: URL: https://github.com/apache/spark/pull/38948#issuecomment-1340342014 In ExecutorPodsAllocatorSuite.scala, the pair of configs always have the following values: ``` .set(KUBERNETES_DRIVER_OWN_PVC.key, "true") .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true") ``` If one of them is false, `requestNewExecutors` would proceed with creating pod and PVC. @dongjoon-hyun Do you know why there is no test with either of these configs as false ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #38938: [WIP][SPARK-41403][CONNECT][PYTHON] Implement `DataFrame.describe`
beliefer commented on code in PR #38938: URL: https://github.com/apache/spark/pull/38938#discussion_r1041729116 ## python/pyspark/sql/connect/dataframe.py: ## @@ -1239,6 +1239,16 @@ def summary(self, *statistics: str) -> "DataFrame": session=self._session, ) +def describe(self, *cols: str) -> "DataFrame": +_cols: List[str] = list(cols) Review Comment: I will update it again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38914: [SPARK-41381][CONNECT][PYTHON] Implement `count_distinct` and `sum_distinct` functions
zhengruifeng commented on code in PR #38914: URL: https://github.com/apache/spark/pull/38914#discussion_r1041728393 ## python/pyspark/sql/tests/connect/test_connect_function.py: ## @@ -413,6 +412,22 @@ def test_aggregation_functions(self): sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 0.9])).toPandas(), ) +# test count_distinct +self.assert_eq( +cdf.select(CF.count_distinct("b"), CF.count_distinct(cdf.c)).toPandas(), +sdf.select(SF.count_distinct("b"), SF.count_distinct(sdf.c)).toPandas(), +) +# The output column names of 'groupBy.agg(count_distinct)' in PySpark Review Comment: this incorrect column name seems not easy to fix, since it should not be related to this PR, I change the column names and compare the values here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement
dongjoon-hyun closed pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement URL: https://github.com/apache/spark/pull/38949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement
dongjoon-hyun commented on PR #38949: URL: https://github.com/apache/spark/pull/38949#issuecomment-1340334817 Thank you so much, @viirya . Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement
dongjoon-hyun commented on code in PR #38949: URL: https://github.com/apache/spark/pull/38949#discussion_r1041724958 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -455,7 +455,6 @@ class ExecutorPodsAllocator( .inNamespace(namespace) .resource(createdExecutorPod) .delete() - PVC_COUNTER.decrementAndGet() Review Comment: Since this is used for driver-owned PVCs, the deletion of pods is irrelevant to the number of PVC. That was my logical bug at the initial patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement
dongjoon-hyun commented on code in PR #38949: URL: https://github.com/apache/spark/pull/38949#discussion_r1041724958 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -455,7 +455,6 @@ class ExecutorPodsAllocator( .inNamespace(namespace) .resource(createdExecutorPod) .delete() - PVC_COUNTER.decrementAndGet() Review Comment: Since this is driver-owned PVCs, the deletion of pods is irrelevant to the number of PVC. That was my logical bug at the initial patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38954: [SPARK-41417][CORE][SQL] Rename `_LEGACY_ERROR_TEMP_0019` to `CANNOT_PARSE_VALUE_TO_DATATYPE`
LuciferYang commented on code in PR #38954: URL: https://github.com/apache/spark/pull/38954#discussion_r1041724482 ## sql/core/src/test/resources/sql-tests/results/literals.sql.out: ## @@ -353,10 +353,12 @@ pattern% no-pattern\%pattern\% pattern\\% select '\'', '"', '\n', '\r', '\t', 'Z' -- !query schema struct<':string,":string, -:string,:string, :string,Z:string> +:string, +:string, :string,Z:string> -- !query output ' " - Z Review Comment: fiixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38949: [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement
dongjoon-hyun commented on code in PR #38949: URL: https://github.com/apache/spark/pull/38949#discussion_r1041724358 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -455,7 +455,6 @@ class ExecutorPodsAllocator( .inNamespace(namespace) .resource(createdExecutorPod) .delete() - PVC_COUNTER.decrementAndGet() Review Comment: Yes, right for case 1 and 2. So, this is a complete and minimal patch, @viirya . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #38882: [SPARK-41365][UI] Stages UI page fails to load for proxy in specific yarn environment
gengliangwang commented on PR #38882: URL: https://github.com/apache/spark/pull/38882#issuecomment-1340331429 @yabola I am not quite sure about the "yarn proxy" you mentioned. Can we fix the issue in a narrow waist method? IIUC there are also ajax requests in the executor page. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38915: [SPARK-41382][CONNECT][PYTHON] Implement `product` function
zhengruifeng commented on PR #38915: URL: https://github.com/apache/spark/pull/38915#issuecomment-1340330948 merged into master, thank you for reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #38915: [SPARK-41382][CONNECT][PYTHON] Implement `product` function
zhengruifeng closed pull request #38915: [SPARK-41382][CONNECT][PYTHON] Implement `product` function URL: https://github.com/apache/spark/pull/38915 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction
zhengruifeng commented on code in PR #38956: URL: https://github.com/apache/spark/pull/38956#discussion_r1041722230 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala: ## @@ -161,6 +161,15 @@ case class CaseWhen( elseValue: Option[Expression] = None) extends ComplexTypeMergingExpression with ConditionalExpression { + def this(branches: Seq[Expression]) = this( Review Comment: still fail with: ``` grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "[FAILED_FUNCTION_CALL] Failed preparing of the function `when` for call. Please, double check function's arguments." debug_error_string = "{"created":"@1670384303.828721000","description":"Error received from peer ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"[FAILED_FUNCTION_CALL] Failed preparing of the function `when` for call. Please, double check function's arguments.","grpc_status":2}" > ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match
cloud-fan commented on code in PR #38924: URL: https://github.com/apache/spark/pull/38924#discussion_r1041721325 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -114,8 +117,21 @@ case class BatchScanExec( // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { + var finalPartitions = filteredPartitions + + outputPartitioning match { +case p: KeyGroupedPartitioning => + val partitionMapping = finalPartitions.map(s => +s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap + finalPartitions = p.partitionValuesOpt.get.map { partKey => Review Comment: shall we move this logic to `lazy val filteredPartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction
zhengruifeng commented on code in PR #38956: URL: https://github.com/apache/spark/pull/38956#discussion_r1041717834 ## python/pyspark/sql/connect/column.py: ## @@ -129,6 +140,53 @@ def name(self) -> str: ... +class CaseWhen(Expression): +def __init__( +self, branches: Sequence[Tuple[Expression, Expression]], else_value: Optional[Expression] +): + +assert isinstance(branches, list) +for branch in branches: +assert ( +isinstance(branch, tuple) +and len(branch) == 2 +and all(isinstance(expr, Expression) for expr in branch) +) +self._branches = branches + +if else_value is not None: +assert isinstance(else_value, Expression) + +self._else_value = else_value + +def to_plan(self, session: "SparkConnectClient") -> "proto.Expression": +# fun = proto.Expression() +# fun.unresolved_function.function_name = "when" +# for condition, value in self._branches: +# fun.unresolved_function.arguments.extend(condition.to_plan(session)) +# fun.unresolved_function.arguments.extend(value.to_plan(session)) +# if self._else_value is not None: +# fun.unresolved_function.arguments.extend(self._else_value.to_plan(session)) +# return fun + +args: Sequence[Expression] = [] +for condition, value in self._branches: +args.append(condition) +args.append(value) + +if self._else_value is not None: +args.append(self._else_value) + +unresolved_function = UnresolvedFunction(name="when", args=args) + +return unresolved_function.to_plan(session) Review Comment: let me add another constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function
LuciferYang commented on code in PR #38865: URL: https://github.com/apache/spark/pull/38865#discussion_r1041716971 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +/** + * Given an array, and another element append the element at the end of the array. + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Append the element", + examples = +""" +Examples: + > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd'); + ["b","d","c","a","d"] + + """, + since = "3.4.0", + group = "array_funcs") +case class ArrayAppend(left: Expression, right: Expression) + extends BinaryExpression + with ImplicitCastInputTypes + with ComplexTypeMergingExpression + with QueryErrorsBase { + override def prettyName: String = "array_append" + + override def inputTypes: Seq[AbstractDataType] = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, hasNull), e2) => +TypeCoercion.findTightestCommonType(e1, e2) match { + case Some(dt) => Seq(ArrayType(dt, hasNull), dt) + case _ => Seq.empty +} + case _ => Seq.empty +} + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +val value2 = right.eval(input) +if (value1 == null) { + null +} else { + nullSafeEval(value1, value2) +} + } + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) { +TypeCheckResult.TypeCheckSuccess + } + else { +DataTypeMismatch( + errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"leftType" -> toSQLType(left.dataType), +"rightType" -> toSQLType(right.dataType), +"dataType" -> toSQLType(ArrayType) + )) + } + case _ => +DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( +"paramIndex" -> "0", +"requiredType" -> toSQLType(ArrayType), +"inputSql" -> toSQLExpr(left), +"inputType" -> toSQLType(left.dataType) + ) +) +} + } + + protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ArrayAppend = +copy(left = newLeft, right = newRight) + + override protected def nullSafeEval(input1: Any, input2: Any): Any = { +val arrayData = input1.asInstanceOf[ArrayData] +val arrayElementType = dataType.asInstanceOf[ArrayType].elementType +val elementData = input2 +val numberOfElements = arrayData.numElements() + 1 +if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements) +} +val finalData = new Array[Any](numberOfElements) +arrayData.foreach(arrayElementType, finalData.update) +finalData.update(numberOfElements - 1, elementData) +new GenericArrayData(finalData) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val f = (left: String, right: String) => { + val expr = ctx.addReferenceObj("arraysAppendExpr", this) + s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);" +} + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) Review Comment: > @LuciferYang One more thing I noticed, if we are having a null int value, it is getting translated to -1. It is picking up the default value for Integer primitive type, in the JavaCode. I think we should override this behaviour right ? Will `null` in the src Array also be translated to `-1`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function
LuciferYang commented on code in PR #38865: URL: https://github.com/apache/spark/pull/38865#discussion_r1041716971 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -4600,3 +4600,133 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight) } + +/** + * Given an array, and another element append the element at the end of the array. + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Append the element", + examples = +""" +Examples: + > SELECT _FUNC_(array('b', 'd', 'c', 'a'), 'd'); + ["b","d","c","a","d"] + + """, + since = "3.4.0", + group = "array_funcs") +case class ArrayAppend(left: Expression, right: Expression) + extends BinaryExpression + with ImplicitCastInputTypes + with ComplexTypeMergingExpression + with QueryErrorsBase { + override def prettyName: String = "array_append" + + override def inputTypes: Seq[AbstractDataType] = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, hasNull), e2) => +TypeCoercion.findTightestCommonType(e1, e2) match { + case Some(dt) => Seq(ArrayType(dt, hasNull), dt) + case _ => Seq.empty +} + case _ => Seq.empty +} + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +val value2 = right.eval(input) +if (value1 == null) { + null +} else { + nullSafeEval(value1, value2) +} + } + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(e1, _), e2) => if (e1.sameType(e2)) { +TypeCheckResult.TypeCheckSuccess + } + else { +DataTypeMismatch( + errorSubClass = "ARRAY_FUNCTION_DIFF_TYPES", + messageParameters = Map( +"functionName" -> toSQLId(prettyName), +"leftType" -> toSQLType(left.dataType), +"rightType" -> toSQLType(right.dataType), +"dataType" -> toSQLType(ArrayType) + )) + } + case _ => +DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( +"paramIndex" -> "0", +"requiredType" -> toSQLType(ArrayType), +"inputSql" -> toSQLExpr(left), +"inputType" -> toSQLType(left.dataType) + ) +) +} + } + + protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ArrayAppend = +copy(left = newLeft, right = newRight) + + override protected def nullSafeEval(input1: Any, input2: Any): Any = { +val arrayData = input1.asInstanceOf[ArrayData] +val arrayElementType = dataType.asInstanceOf[ArrayType].elementType +val elementData = input2 +val numberOfElements = arrayData.numElements() + 1 +if (numberOfElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(numberOfElements) +} +val finalData = new Array[Any](numberOfElements) +arrayData.foreach(arrayElementType, finalData.update) +finalData.update(numberOfElements - 1, elementData) +new GenericArrayData(finalData) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val f = (left: String, right: String) => { + val expr = ctx.addReferenceObj("arraysAppendExpr", this) + s"${ev.value} = (ArrayData)$expr.nullSafeEval($left, $right);" +} + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) Review Comment: > Integer Will `null` in the src Array also be translated to `-1`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction
zhengruifeng commented on code in PR #38956: URL: https://github.com/apache/spark/pull/38956#discussion_r1041716422 ## python/pyspark/sql/connect/column.py: ## @@ -129,6 +140,53 @@ def name(self) -> str: ... +class CaseWhen(Expression): +def __init__( +self, branches: Sequence[Tuple[Expression, Expression]], else_value: Optional[Expression] +): + +assert isinstance(branches, list) +for branch in branches: +assert ( +isinstance(branch, tuple) +and len(branch) == 2 +and all(isinstance(expr, Expression) for expr in branch) +) +self._branches = branches + +if else_value is not None: +assert isinstance(else_value, Expression) + +self._else_value = else_value + +def to_plan(self, session: "SparkConnectClient") -> "proto.Expression": +# fun = proto.Expression() +# fun.unresolved_function.function_name = "when" +# for condition, value in self._branches: +# fun.unresolved_function.arguments.extend(condition.to_plan(session)) +# fun.unresolved_function.arguments.extend(value.to_plan(session)) +# if self._else_value is not None: +# fun.unresolved_function.arguments.extend(self._else_value.to_plan(session)) +# return fun + +args: Sequence[Expression] = [] +for condition, value in self._branches: +args.append(condition) +args.append(value) + +if self._else_value is not None: +args.append(self._else_value) + +unresolved_function = UnresolvedFunction(name="when", args=args) + +return unresolved_function.to_plan(session) Review Comment: @hvanhovell @amaliujia -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #38924: [SPARK-41398][SQL] Relax constraints on Storage-Partitioned Join when partition keys after runtime filtering do not match
sunchao commented on code in PR #38924: URL: https://github.com/apache/spark/pull/38924#discussion_r1041716400 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala: ## @@ -81,18 +81,21 @@ case class BatchScanExec( val newRows = new InternalRowSet(p.expressions.map(_.dataType)) newRows ++= newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey()) - val oldRows = p.partitionValuesOpt.get - if (oldRows.size != newRows.size) { -throw new SparkException("Data source must have preserved the original partitioning " + -"during runtime filtering: the number of unique partition values obtained " + -s"through HasPartitionKey changed: before ${oldRows.size}, after ${newRows.size}") + val oldRows = p.partitionValuesOpt.get.toSet Review Comment: Oops you're right! I forgot about `InternalRowSet`. Let me create a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction
zhengruifeng commented on code in PR #38956: URL: https://github.com/apache/spark/pull/38956#discussion_r1041716023 ## python/pyspark/sql/connect/column.py: ## @@ -129,6 +140,53 @@ def name(self) -> str: ... +class CaseWhen(Expression): +def __init__( +self, branches: Sequence[Tuple[Expression, Expression]], else_value: Optional[Expression] +): + +assert isinstance(branches, list) +for branch in branches: +assert ( +isinstance(branch, tuple) +and len(branch) == 2 +and all(isinstance(expr, Expression) for expr in branch) +) +self._branches = branches + +if else_value is not None: +assert isinstance(else_value, Expression) + +self._else_value = else_value + +def to_plan(self, session: "SparkConnectClient") -> "proto.Expression": +# fun = proto.Expression() +# fun.unresolved_function.function_name = "when" +# for condition, value in self._branches: +# fun.unresolved_function.arguments.extend(condition.to_plan(session)) +# fun.unresolved_function.arguments.extend(value.to_plan(session)) +# if self._else_value is not None: +# fun.unresolved_function.arguments.extend(self._else_value.to_plan(session)) +# return fun + +args: Sequence[Expression] = [] +for condition, value in self._branches: +args.append(condition) +args.append(value) + +if self._else_value is not None: +args.append(self._else_value) + +unresolved_function = UnresolvedFunction(name="when", args=args) + +return unresolved_function.to_plan(session) Review Comment: both `cdf.select(CF.when(cdf.a == 0, 1.0))` and `cdf.select(CF.when(cdf.a == 0, 1.0).otherwise(2.0))` fail with this error: ``` grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "Invalid arguments for function when." debug_error_string = "{"created":"@1670381875.743425000","description":"Error received from peer ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Invalid arguments for function when.","grpc_status":2}" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #38956: [DO_NOT_MERGE] Implement Column.{when, otherwise} and Function when with UnresolvedFunction
zhengruifeng opened a new pull request, #38956: URL: https://github.com/apache/spark/pull/38956 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org