[PR] [WIP] test java toUpperCase & toLowerCase [spark]
panbingkun opened a new pull request, #46147: URL: https://github.com/apache/spark/pull/46147 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]
CTCC1 commented on code in PR #46045: URL: https://github.com/apache/spark/pull/46045#discussion_r1573145872 ## python/pyspark/sql/connect/functions/builtin.py: ## @@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column: repeat.__doc__ = pysparkfuncs.repeat.__doc__ -def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column: -return _invoke_function("split", _to_col(str), lit(pattern), lit(limit)) +def split( +str: "ColumnOrName", +pattern: Union[Column, str], +limit: Union["ColumnOrName", int] = -1, +) -> Column: +# work around shadowing of str in the input variable name +from builtins import str as py_str + +if isinstance(pattern, py_str): +_pattern = lit(pattern) +elif isinstance(pattern, Column): +_pattern = pattern +else: +raise PySparkTypeError( +error_class="NOT_COLUMN_OR_STR", +message_parameters={"arg_name": "pattern", "arg_type": type(pattern).__name__}, +) + +limit = lit(limit) if isinstance(limit, int) else _to_col(limit) +return _invoke_function("split", _to_col(str), _pattern, limit) Review Comment: Sure. I removed the type check now. Maybe in the future we can standardize this, e.g. with a decorator that inspect the function signature and do type check accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]
liucao-dd commented on code in PR #46045: URL: https://github.com/apache/spark/pull/46045#discussion_r1573144905 ## python/pyspark/sql/connect/functions/builtin.py: ## @@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column: repeat.__doc__ = pysparkfuncs.repeat.__doc__ -def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column: -return _invoke_function("split", _to_col(str), lit(pattern), lit(limit)) +def split( +str: "ColumnOrName", +pattern: Union[Column, str], +limit: Union["ColumnOrName", int] = -1, +) -> Column: +# work around shadowing of str in the input variable name +from builtins import str as py_str + +if isinstance(pattern, py_str): +_pattern = lit(pattern) +elif isinstance(pattern, Column): +_pattern = pattern +else: +raise PySparkTypeError( +error_class="NOT_COLUMN_OR_STR", +message_parameters={"arg_name": "pattern", "arg_type": type(pattern).__name__}, +) + +limit = lit(limit) if isinstance(limit, int) else _to_col(limit) +return _invoke_function("split", _to_col(str), _pattern, limit) Review Comment: Sure. I removed the type check now. Maybe in the future we can standard this, e.g. with a decorator that inspect the function signature and do type check accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]
liucao-dd commented on code in PR #46045: URL: https://github.com/apache/spark/pull/46045#discussion_r1573144905 ## python/pyspark/sql/connect/functions/builtin.py: ## @@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column: repeat.__doc__ = pysparkfuncs.repeat.__doc__ -def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column: -return _invoke_function("split", _to_col(str), lit(pattern), lit(limit)) +def split( +str: "ColumnOrName", +pattern: Union[Column, str], +limit: Union["ColumnOrName", int] = -1, +) -> Column: +# work around shadowing of str in the input variable name +from builtins import str as py_str + +if isinstance(pattern, py_str): +_pattern = lit(pattern) +elif isinstance(pattern, Column): +_pattern = pattern +else: +raise PySparkTypeError( +error_class="NOT_COLUMN_OR_STR", +message_parameters={"arg_name": "pattern", "arg_type": type(pattern).__name__}, +) + +limit = lit(limit) if isinstance(limit, int) else _to_col(limit) +return _invoke_function("split", _to_col(str), _pattern, limit) Review Comment: Sure. I removed the type check now. Maybe in the future we can standard this, e.g. with a decorator that inspect the function signature and do type check accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` [spark]
dongjoon-hyun closed pull request #46145: [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` URL: https://github.com/apache/spark/pull/46145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` [spark]
dongjoon-hyun commented on PR #46145: URL: https://github.com/apache/spark/pull/46145#issuecomment-2067526908 Thank you, @HyukjinKwon . Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` [spark]
dongjoon-hyun closed pull request #46144: [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` URL: https://github.com/apache/spark/pull/46144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 [spark]
dongjoon-hyun closed pull request #46142: [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 URL: https://github.com/apache/spark/pull/46142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` [spark]
dongjoon-hyun commented on PR #46144: URL: https://github.com/apache/spark/pull/46144#issuecomment-2067525455 Thank you, @HyukjinKwon . Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 [spark]
dongjoon-hyun commented on PR #46142: URL: https://github.com/apache/spark/pull/46142#issuecomment-2067525074 Thank you, @HyukjinKwon ! Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47907] Put bang under a config [spark]
srielau commented on PR #46138: URL: https://github.com/apache/spark/pull/46138#issuecomment-2067511478 @cloud-fan @gengliangwang This is ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1573100752 ## python/pyspark/sql/worker/plan_data_source_read.py: ## @@ -51,6 +52,71 @@ ) +def records_to_arrow_batches( +output_iter: Iterator[Tuple], +max_arrow_batch_size: int, +return_type: StructType, +data_source: DataSource, +) -> Iterable[pa.RecordBatch]: Review Comment: docstring added. ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): +def __init__(self, start: dict, end: dict, it: Iterator[Tuple]): +self.start = start +self.end = end +self.it = it + + +class SimpleDataSourceStreamReader(ABC): +""" +A base class for simplified streaming data source readers. +Compared to :class:`DataSourceStreamReader`, :class:`SimpleDataSourceStreamReader` doesn't +require planning data partition. Also, the read api of :class:`SimpleDataSourceStreamReader` +allows reading data and planning the latest offset at the same time. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: +""" +Read all available data from start offset and return the offset that next read attempt +starts from. + +Parameters +-- +start : dict +The start offset to start reading from. + +Returns +--- +A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s +The iterator contains all the available records after start offset. +The dict is the end offset of this read attempt and the start of next read attempt. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read"}, +) + +def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: +""" +Read all available data from specific start offset and end offset. +This is invoked during failure recovery to re-read a batch deterministically +in order to achieve exactly once. + +Parameters +-- +start : dict +The start offset to start reading from. + +end : dict +The offset where the reading stop. + +Returns +--- +iterator of :class:`Tuple`\\s +All the records between start offset and end offset. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read2"}, Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1573100690 ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): +def __init__(self, start: dict, end: dict, it: Iterator[Tuple]): Review Comment: Fixed. ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): +def __init__(self, start: dict, end: dict, it: Iterator[Tuple]): +self.start = start +self.end = end +self.it = it + + +class SimpleDataSourceStreamReader(ABC): +""" +A base class for simplified streaming data source readers. +Compared to :class:`DataSourceStreamReader`, :class:`SimpleDataSourceStreamReader` doesn't +require planning data partition. Also, the read api of :class:`SimpleDataSourceStreamReader` +allows reading data and planning the latest offset at the same time. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: +""" +Read all available data from start offset and return the offset that next read attempt +starts from. + +Parameters +-- +start : dict +The start offset to start reading from. + +Returns +--- +A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s +The iterator contains all the available records after start offset. +The dict is the end offset of this read attempt and the start of next read attempt. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read"}, +) + +def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: +""" +Read all available data from specific start offset and end offset. +This is invoked during failure recovery to re-read a batch deterministically +in order to achieve exactly once. + +Parameters +-- +start : dict +The start offset to start reading from. + +end : dict +The offset where the reading stop. + +Returns +--- +iterator of :class:`Tuple`\\s +All the records between start offset and end offset. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read2"}, +) + +def commit(self, end: dict) -> None: +""" +Informs the source that Spark has completed processing all data for offsets less than or +equal to `end` and will only request offsets greater than `end` in the future. + +Parameters +-- +end : dict +The latest offset that the streaming query has processed for this source. +""" +... + + +class _SimpleStreamReaderWrapper(DataSourceStreamReader): +""" +A private class that wrap :class:`SimpleDataSourceStreamReader` in prefetch and cache pattern, +so that :class:`SimpleDataSourceStreamReader` can integrate with streaming engine like an +ordinary :class:`DataSourceStreamReader`. + +current_offset tracks the latest progress of the record prefetching, it is initialized to be +initialOffset() when query start for the first time or initialized to be the end offset of +the last committed batch when query restarts. + +When streaming engine calls latestOffset(), the wrapper calls read() that starts from +current_offset, prefetches and cache the data, then updates the
Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]
erenavsarogullari commented on code in PR #45234: URL: https://github.com/apache/spark/pull/45234#discussion_r1573096494 ## sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala: ## @@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val joinedDF = createJoinedDF() + +val error = intercept[SparkException] { + joinedDF.collect() +} +assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + +val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + +// All QueryStages should be based on ShuffleQueryStageExec +val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse +} +assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") +shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) +// First ShuffleQueryStage is materialized so it needs to be canceled. +assert(shuffleQueryStageExecs(0).isMaterializationStarted(), + "Materialization should be started.") +// Second ShuffleQueryStage materialization is failed so +// it is excluded from the cancellation due to earlyFailedStage. +assert(shuffleQueryStageExecs(1).isMaterializationStarted(), + "Materialization should be started but it is failed.") +// Last ShuffleQueryStage is not materialized yet so it does not require +// to be canceled and it is just skipped from the cancellation. +assert(!shuffleQueryStageExecs(2).isMaterializationStarted(), + "Materialization should not be started.") + } +} finally { + spark.experimental.extraStrategies = Nil +} + } + + test("SPARK-47148: Check if BroadcastQueryStage materialization is started") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { +val joinedDF = createJoinedDF() + +val error = intercept[SparkException] { + joinedDF.collect() +} +assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + +val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + +// All QueryStages should be based on BroadcastQueryStageExec +val broadcastQueryStageExecs = collect(adaptivePlan) { + case bqse: BroadcastQueryStageExec => bqse +} +assert(broadcastQueryStageExecs.length == 2, adaptivePlan) +broadcastQueryStageExecs.foreach { bqse => + assert(bqse.name.contains("BroadcastQueryStageExec-")) + // Both BroadcastQueryStages are materialized at the beginning. Review Comment: I have added `BROADCAST` hint, however, second BroadcastQueryStage materialization still seems to be kicked off. Please see last commit. ## sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala: ## @@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val joinedDF = createJoinedDF() + +val error = intercept[SparkException] { + joinedDF.collect() +} +assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + +val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + +// All QueryStages should be based on ShuffleQueryStageExec +val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse +} +assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") +shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) +// First
Re: [PR] [WIP] Only test rocksdbjni 9.x [spark]
panbingkun commented on PR #46146: URL: https://github.com/apache/spark/pull/46146#issuecomment-2067487411 At present, we are only testing `rocksdbjni's` `9 series` in advance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] Only test rocksdbjni 9.x [spark]
panbingkun opened a new pull request, #46146: URL: https://github.com/apache/spark/pull/46146 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]
erenavsarogullari commented on code in PR #45234: URL: https://github.com/apache/spark/pull/45234#discussion_r1573096494 ## sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala: ## @@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val joinedDF = createJoinedDF() + +val error = intercept[SparkException] { + joinedDF.collect() +} +assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + +val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + +// All QueryStages should be based on ShuffleQueryStageExec +val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse +} +assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") +shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) +// First ShuffleQueryStage is materialized so it needs to be canceled. +assert(shuffleQueryStageExecs(0).isMaterializationStarted(), + "Materialization should be started.") +// Second ShuffleQueryStage materialization is failed so +// it is excluded from the cancellation due to earlyFailedStage. +assert(shuffleQueryStageExecs(1).isMaterializationStarted(), + "Materialization should be started but it is failed.") +// Last ShuffleQueryStage is not materialized yet so it does not require +// to be canceled and it is just skipped from the cancellation. +assert(!shuffleQueryStageExecs(2).isMaterializationStarted(), + "Materialization should not be started.") + } +} finally { + spark.experimental.extraStrategies = Nil +} + } + + test("SPARK-47148: Check if BroadcastQueryStage materialization is started") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { +val joinedDF = createJoinedDF() + +val error = intercept[SparkException] { + joinedDF.collect() +} +assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + +val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + +// All QueryStages should be based on BroadcastQueryStageExec +val broadcastQueryStageExecs = collect(adaptivePlan) { + case bqse: BroadcastQueryStageExec => bqse +} +assert(broadcastQueryStageExecs.length == 2, adaptivePlan) +broadcastQueryStageExecs.foreach { bqse => + assert(bqse.name.contains("BroadcastQueryStageExec-")) + // Both BroadcastQueryStages are materialized at the beginning. Review Comment: I have added `BROADCAST` hint, however, both BroadcastQueryStages materializations still seem to be kicked off. Please see last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]
erenavsarogullari commented on code in PR #45234: URL: https://github.com/apache/spark/pull/45234#discussion_r1573095590 ## sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala: ## @@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", Review Comment: Addressed ## sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala: ## @@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true", +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val joinedDF = createJoinedDF() + +val error = intercept[SparkException] { + joinedDF.collect() +} +assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + +val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + +// All QueryStages should be based on ShuffleQueryStageExec +val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse +} +assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") +shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) +// First ShuffleQueryStage is materialized so it needs to be canceled. +assert(shuffleQueryStageExecs(0).isMaterializationStarted(), + "Materialization should be started.") +// Second ShuffleQueryStage materialization is failed so +// it is excluded from the cancellation due to earlyFailedStage. +assert(shuffleQueryStageExecs(1).isMaterializationStarted(), + "Materialization should be started but it is failed.") +// Last ShuffleQueryStage is not materialized yet so it does not require +// to be canceled and it is just skipped from the cancellation. +assert(!shuffleQueryStageExecs(2).isMaterializationStarted(), + "Materialization should not be started.") + } +} finally { + spark.experimental.extraStrategies = Nil +} + } + + test("SPARK-47148: Check if BroadcastQueryStage materialization is started") { +try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( +SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", +SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1573095371 ## python/pyspark/sql/classic/dataframe.py: ## @@ -0,0 +1,1974 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import json +import sys +import random +import warnings +from collections.abc import Iterable +from functools import reduce +from typing import ( +Any, +Callable, +Dict, +Iterator, +List, +Optional, +Sequence, +Tuple, +Type, +Union, +cast, +overload, +TYPE_CHECKING, +) + +from pyspark import _NoValue +from pyspark.resource import ResourceProfile +from pyspark._globals import _NoValueType +from pyspark.errors import ( +PySparkTypeError, +PySparkValueError, +PySparkIndexError, +PySparkAttributeError, +) +from pyspark.util import ( +is_remote_only, +_load_from_socket, +_local_iterator_from_socket, +) +from pyspark.serializers import BatchedSerializer, CPickleSerializer, UTF8Deserializer +from pyspark.storagelevel import StorageLevel +from pyspark.traceback_utils import SCCallSiteSync +from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column +from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2 +from pyspark.sql.streaming import DataStreamWriter +from pyspark.sql.types import ( +StructType, +Row, +_parse_datatype_json_string, +) +from pyspark.sql.dataframe import ( +DataFrame as ParentDataFrame, +DataFrameNaFunctions as ParentDataFrameNaFunctions, +DataFrameStatFunctions as ParentDataFrameStatFunctions, +) +from pyspark.sql.utils import get_active_spark_context, toJArray +from pyspark.sql.pandas.conversion import PandasConversionMixin +from pyspark.sql.pandas.map_ops import PandasMapOpsMixin + +if TYPE_CHECKING: +from py4j.java_gateway import JavaObject +from pyspark.core.rdd import RDD +from pyspark.core.context import SparkContext +from pyspark._typing import PrimitiveType +from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame +from pyspark.sql._typing import ( +ColumnOrName, +ColumnOrNameOrOrdinal, +LiteralType, +OptionalPrimitiveType, +) +from pyspark.sql.context import SQLContext +from pyspark.sql.session import SparkSession +from pyspark.sql.group import GroupedData +from pyspark.sql.observation import Observation + + +class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin): +def __new__( +cls, +jdf: "JavaObject", +sql_ctx: Union["SQLContext", "SparkSession"], +) -> "DataFrame": +self = object.__new__(cls) +self.__init__(jdf, sql_ctx) # type: ignore[misc] +return self + +def __init__( +self, +jdf: "JavaObject", +sql_ctx: Union["SQLContext", "SparkSession"], +): +from pyspark.sql.context import SQLContext + +self._sql_ctx: Optional["SQLContext"] = None + +if isinstance(sql_ctx, SQLContext): +assert not os.environ.get("SPARK_TESTING") # Sanity check for our internal usage. +assert isinstance(sql_ctx, SQLContext) +# We should remove this if-else branch in the future release, and rename +# sql_ctx to session in the constructor. This is an internal code path but +# was kept with a warning because it's used intensively by third-party libraries. +warnings.warn("DataFrame constructor is internal. Do not directly use it.") +self._sql_ctx = sql_ctx +session = sql_ctx.sparkSession +else: +session = sql_ctx +self._session: "SparkSession" = session + +self._sc: "SparkContext" = sql_ctx._sc +self._jdf: "JavaObject" = jdf +self.is_cached = False +# initialized lazily +self._schema: Optional[StructType] = None +self._lazy_rdd: Optional["RDD[Row]"] = None +# Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice +# by __repr__ and _repr_html_ while eager evaluation opens. +self._support_repr_html = False + +@property +def
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1573095413 ## python/pyspark/sql/classic/dataframe.py: ## @@ -0,0 +1,1974 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import json +import sys +import random +import warnings +from collections.abc import Iterable +from functools import reduce +from typing import ( +Any, +Callable, +Dict, +Iterator, +List, +Optional, +Sequence, +Tuple, +Type, +Union, +cast, +overload, +TYPE_CHECKING, +) + +from pyspark import _NoValue +from pyspark.resource import ResourceProfile +from pyspark._globals import _NoValueType +from pyspark.errors import ( +PySparkTypeError, +PySparkValueError, +PySparkIndexError, +PySparkAttributeError, +) +from pyspark.util import ( +is_remote_only, +_load_from_socket, +_local_iterator_from_socket, +) +from pyspark.serializers import BatchedSerializer, CPickleSerializer, UTF8Deserializer +from pyspark.storagelevel import StorageLevel +from pyspark.traceback_utils import SCCallSiteSync +from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column +from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2 +from pyspark.sql.streaming import DataStreamWriter +from pyspark.sql.types import ( +StructType, +Row, +_parse_datatype_json_string, +) +from pyspark.sql.dataframe import ( +DataFrame as ParentDataFrame, +DataFrameNaFunctions as ParentDataFrameNaFunctions, +DataFrameStatFunctions as ParentDataFrameStatFunctions, +) +from pyspark.sql.utils import get_active_spark_context, toJArray +from pyspark.sql.pandas.conversion import PandasConversionMixin +from pyspark.sql.pandas.map_ops import PandasMapOpsMixin + +if TYPE_CHECKING: +from py4j.java_gateway import JavaObject +from pyspark.core.rdd import RDD +from pyspark.core.context import SparkContext +from pyspark._typing import PrimitiveType +from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame +from pyspark.sql._typing import ( +ColumnOrName, +ColumnOrNameOrOrdinal, +LiteralType, +OptionalPrimitiveType, +) +from pyspark.sql.context import SQLContext +from pyspark.sql.session import SparkSession +from pyspark.sql.group import GroupedData +from pyspark.sql.observation import Observation + + +class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin): +def __new__( +cls, +jdf: "JavaObject", +sql_ctx: Union["SQLContext", "SparkSession"], +) -> "DataFrame": +self = object.__new__(cls) +self.__init__(jdf, sql_ctx) # type: ignore[misc] +return self + +def __init__( +self, +jdf: "JavaObject", +sql_ctx: Union["SQLContext", "SparkSession"], +): +from pyspark.sql.context import SQLContext + +self._sql_ctx: Optional["SQLContext"] = None + +if isinstance(sql_ctx, SQLContext): +assert not os.environ.get("SPARK_TESTING") # Sanity check for our internal usage. +assert isinstance(sql_ctx, SQLContext) +# We should remove this if-else branch in the future release, and rename +# sql_ctx to session in the constructor. This is an internal code path but +# was kept with a warning because it's used intensively by third-party libraries. +warnings.warn("DataFrame constructor is internal. Do not directly use it.") +self._sql_ctx = sql_ctx +session = sql_ctx.sparkSession +else: +session = sql_ctx +self._session: "SparkSession" = session + +self._sc: "SparkContext" = sql_ctx._sc +self._jdf: "JavaObject" = jdf +self.is_cached = False +# initialized lazily +self._schema: Optional[StructType] = None +self._lazy_rdd: Optional["RDD[Row]"] = None +# Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice +# by __repr__ and _repr_html_ while eager evaluation opens. +self._support_repr_html = False + +@property +def
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1573095371 ## python/pyspark/sql/classic/dataframe.py: ## @@ -0,0 +1,1974 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import json +import sys +import random +import warnings +from collections.abc import Iterable +from functools import reduce +from typing import ( +Any, +Callable, +Dict, +Iterator, +List, +Optional, +Sequence, +Tuple, +Type, +Union, +cast, +overload, +TYPE_CHECKING, +) + +from pyspark import _NoValue +from pyspark.resource import ResourceProfile +from pyspark._globals import _NoValueType +from pyspark.errors import ( +PySparkTypeError, +PySparkValueError, +PySparkIndexError, +PySparkAttributeError, +) +from pyspark.util import ( +is_remote_only, +_load_from_socket, +_local_iterator_from_socket, +) +from pyspark.serializers import BatchedSerializer, CPickleSerializer, UTF8Deserializer +from pyspark.storagelevel import StorageLevel +from pyspark.traceback_utils import SCCallSiteSync +from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column +from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2 +from pyspark.sql.streaming import DataStreamWriter +from pyspark.sql.types import ( +StructType, +Row, +_parse_datatype_json_string, +) +from pyspark.sql.dataframe import ( +DataFrame as ParentDataFrame, +DataFrameNaFunctions as ParentDataFrameNaFunctions, +DataFrameStatFunctions as ParentDataFrameStatFunctions, +) +from pyspark.sql.utils import get_active_spark_context, toJArray +from pyspark.sql.pandas.conversion import PandasConversionMixin +from pyspark.sql.pandas.map_ops import PandasMapOpsMixin + +if TYPE_CHECKING: +from py4j.java_gateway import JavaObject +from pyspark.core.rdd import RDD +from pyspark.core.context import SparkContext +from pyspark._typing import PrimitiveType +from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame +from pyspark.sql._typing import ( +ColumnOrName, +ColumnOrNameOrOrdinal, +LiteralType, +OptionalPrimitiveType, +) +from pyspark.sql.context import SQLContext +from pyspark.sql.session import SparkSession +from pyspark.sql.group import GroupedData +from pyspark.sql.observation import Observation + + +class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin): +def __new__( +cls, +jdf: "JavaObject", +sql_ctx: Union["SQLContext", "SparkSession"], +) -> "DataFrame": +self = object.__new__(cls) +self.__init__(jdf, sql_ctx) # type: ignore[misc] +return self + +def __init__( +self, +jdf: "JavaObject", +sql_ctx: Union["SQLContext", "SparkSession"], +): +from pyspark.sql.context import SQLContext + +self._sql_ctx: Optional["SQLContext"] = None + +if isinstance(sql_ctx, SQLContext): +assert not os.environ.get("SPARK_TESTING") # Sanity check for our internal usage. +assert isinstance(sql_ctx, SQLContext) +# We should remove this if-else branch in the future release, and rename +# sql_ctx to session in the constructor. This is an internal code path but +# was kept with a warning because it's used intensively by third-party libraries. +warnings.warn("DataFrame constructor is internal. Do not directly use it.") +self._sql_ctx = sql_ctx +session = sql_ctx.sparkSession +else: +session = sql_ctx +self._session: "SparkSession" = session + +self._sc: "SparkContext" = sql_ctx._sc +self._jdf: "JavaObject" = jdf +self.is_cached = False +# initialized lazily +self._schema: Optional[StructType] = None +self._lazy_rdd: Optional["RDD[Row]"] = None +# Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice +# by __repr__ and _repr_html_ while eager evaluation opens. +self._support_repr_html = False + +@property +def
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1573094548 ## python/pyspark/sql/utils.py: ## @@ -302,6 +302,33 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: return cast(FuncT, wrapped) +def dispatch_df_method(f: FuncT) -> FuncT: +""" +For the usecases of direct DataFrame.union(df, ...), it checks if self +is a Connect DataFrame or Classic DataFrame, and dispatches. +""" + +@functools.wraps(f) +def wrapped(*args: Any, **kwargs: Any) -> Any: +if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ: +from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame + +if isinstance(args[0], ConnectDataFrame): +return getattr(ConnectDataFrame, f.__name__)(*args, **kwargs) +else: +from pyspark.sql.classic.dataframe import DataFrame as ClassicDataFrame Review Comment: It should be covered by `is_remote` which is `True` when `is_remote_only` is `True`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1573094799 ## python/pyspark/sql/connect/session.py: ## @@ -325,7 +325,7 @@ def active(cls) -> "SparkSession": active.__doc__ = PySparkSession.active.__doc__ -def table(self, tableName: str) -> DataFrame: +def table(self, tableName: str) -> ParentDataFrame: Review Comment: this was the way MyPy least complained IIRC. Let me take a look again .. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` [spark]
dongjoon-hyun opened a new pull request, #46145: URL: https://github.com/apache/spark/pull/46145 … ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1573092072 ## python/pyspark/sql/datasource.py: ## @@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStream message_parameters={"feature": "streamWriter"}, ) +def _streamReader(self, schema: StructType) -> "DataSourceStreamReader": Review Comment: This is a private method to fall back to simple reader when streaming reader is not implemented. I would be ok to change this if there is an alternative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1573085074 ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): Review Comment: Where should we move these functions to if we don't want them to be public API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1573085009 ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): Review Comment: Simple reader doesn't expose partitioning to user, so this is only used for the wrapper to make simple reader integrate with streaming engine. Where should we move these internal code if we don't want it to be public API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` [spark]
dongjoon-hyun opened a new pull request, #46144: URL: https://github.com/apache/spark/pull/46144 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-19335][SPARK-38200][SQL] Add upserts for writing to JDBC [spark]
github-actions[bot] commented on PR #41518: URL: https://github.com/apache/spark/pull/41518#issuecomment-2067417265 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47922][SQL] Implement the try_parse_json expression [spark]
harshmotw-db commented on PR #46141: URL: https://github.com/apache/spark/pull/46141#issuecomment-2067417422 cc @chenhao-db @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45709][BUILD] Deploy packages when all packages are built [spark]
github-actions[bot] commented on PR #43561: URL: https://github.com/apache/spark/pull/43561#issuecomment-2067417258 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SPARK-47672][SQL] Avoid double eval from filter pushDown w/ projection pushdown [spark]
holdenk opened a new pull request, #46143: URL: https://github.com/apache/spark/pull/46143 ### What changes were proposed in this pull request? Changes the filter pushDown optimizer to not push down past projections of the same element if we reasonable expect that computing that element is likely to be expensive. This is a more complex alternative to https://github.com/apache/spark/pull/45802 which also moves parts of projections down so that the filters can move further down. This introduces an "expectedCost" mechanism which we may or may not want. Previous filter ordering work used filter pushdowns as an approximation of expression cost but here we need more granularity. As an alternative we could introduce a flag for expensive rather than numeric operations. Another alternative would be seeing if the predicate can be "converted" as a proxy for cheap. ### Future Work / What else remains to do? Right now if a cond is expensive and it references something in the projection we don't push-down. We could probably do better and gate this on if the thing we are reference is expensive rather than the condition it's self. We could do this as a follow up item or as part of this PR. ### Why are the changes needed? Currently Spark may double compute expensive operations (like json parsing, UDF eval, etc.) as a result of filter pushdown past projections. ### Does this PR introduce _any_ user-facing change? SQL optimizer change may impact some user queries, results should be the same and hopefully a little faster. ### How was this patch tested? New tests were added to the FilterPushDownSuite, and the initial problem of double evaluation was confirmed with a github gist ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 [spark]
dongjoon-hyun opened a new pull request, #46142: URL: https://github.com/apache/spark/pull/46142 … ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1573032141 ## python/pyspark/sql/types.py: ## @@ -1521,6 +1521,19 @@ def toPython(self) -> Any: """ return VariantUtils.to_python(self.value, self.metadata) +def toJson(self, zone_id: str = "UTC") -> str: Review Comment: Resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1573031748 ## python/pyspark/sql/types.py: ## @@ -1521,6 +1521,19 @@ def toPython(self) -> Any: """ return VariantUtils.to_python(self.value, self.metadata) +def toJson(self, zone_id: str = "UTC") -> str: Review Comment: Thanks! I was trying to figure out why document generation was failing in the linting jobs. I thought it was a syntax error in Sphinx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]
gene-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1573029280 ## python/pyspark/sql/types.py: ## @@ -1521,6 +1521,19 @@ def toPython(self) -> Any: """ return VariantUtils.to_python(self.value, self.metadata) +def toJson(self, zone_id: str = "UTC") -> str: Review Comment: You might need to update this https://github.com/databricks/runtime/blob/master/python/docs/source/reference/pyspark.sql/variant_val.rst?plain=1#L27 to generate the docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
ericm-db commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1573000267 ## python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py: ## @@ -66,6 +66,30 @@ def func(df, _): q = df.writeStream.foreachBatch(func).start() q.processAllAvailable() +def test_pickling_deserialization_error(self): +class NoUnpickle: +def __init__(self, data): +self.data = data + +def __reduce__(self): +# Serialize only the data attribute +return (self.__class__, (self.data,)) Review Comment: Ah yeah, will make the refactor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1572997060 ## python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py: ## @@ -66,6 +66,30 @@ def func(df, _): q = df.writeStream.foreachBatch(func).start() q.processAllAvailable() +def test_pickling_deserialization_error(self): +class NoUnpickle: +def __init__(self, data): +self.data = data + +def __reduce__(self): +# Serialize only the data attribute +return (self.__class__, (self.data,)) Review Comment: I think here, you should return a function that throws. ``` def __reduce__(self): reurn (error_fn, ()) def error_fn: raise ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1572997327 ## python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py: ## @@ -66,6 +66,30 @@ def func(df, _): q = df.writeStream.foreachBatch(func).start() q.processAllAvailable() +def test_pickling_deserialization_error(self): +class NoUnpickle: +def __init__(self, data): +self.data = data + +def __reduce__(self): +# Serialize only the data attribute +return (self.__class__, (self.data,)) + +def __reduce_ex__(self, proto): Review Comment: remove this to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1572995277 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -63,8 +63,13 @@ def main(infile: IO, outfile: IO) -> None: spark = spark_connect_session # TODO(SPARK-44461): Enable Process Isolation +try: Review Comment: Can this try be around teh while loop below? I.e ``` try: func = worker.read_command(pickle_ser, infile) write_int... while True: .. except ``` Note that currently, errors around `read_long()` are not handled well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Try parse json [spark]
harshmotw-db opened a new pull request, #46141: URL: https://github.com/apache/spark/pull/46141 ### What changes were proposed in this pull request? This pull request implements the `try_parse_json` that runs `parse_json` on string expressions to extract variants. However, if `parse_json` throws an exception on a row, the value `null` is returned. ### Why are the changes needed? Sometimes, columns containing JSON strings may contain some invalid inputs that should be ignored instead of having the whole execution failed because of it. ### Does this PR introduce _any_ user-facing change? Yes, it allows users to run the `try_parse_json` expression. ### How was this patch tested? Unit tests to check if `try_parse_json` works just like `parse_json` on valid inputs, returns `null` on invalid inputs, and fails on incorrect input data types. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
ueshin commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1572787315 ## python/pyspark/sql/dataframe.py: ## @@ -139,51 +123,29 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): created via using the constructor. """ -def __init__( -self, +# HACK ALERT!! this is to reduce the backward compatibility concern, and returns +# Spark Classic DataFrame by default. This is NOT an API, and NOT supposed to +# be directly invoked. DO NOT use this constructor. +_sql_ctx: Optional["SQLContext"] +_session: "SparkSession" +_sc: "SparkContext" +_jdf: "JavaObject" +is_cached: bool +_schema: Optional[StructType] +_lazy_rdd: Optional["RDD[Row]"] +_support_repr_html: bool + +def __new__( +cls, jdf: "JavaObject", sql_ctx: Union["SQLContext", "SparkSession"], -): -from pyspark.sql.context import SQLContext - -self._sql_ctx: Optional["SQLContext"] = None - -if isinstance(sql_ctx, SQLContext): -assert not os.environ.get("SPARK_TESTING") # Sanity check for our internal usage. -assert isinstance(sql_ctx, SQLContext) -# We should remove this if-else branch in the future release, and rename -# sql_ctx to session in the constructor. This is an internal code path but -# was kept with a warning because it's used intensively by third-party libraries. -warnings.warn("DataFrame constructor is internal. Do not directly use it.") -self._sql_ctx = sql_ctx -session = sql_ctx.sparkSession -else: -session = sql_ctx -self._session: "SparkSession" = session - -self._sc: "SparkContext" = sql_ctx._sc -self._jdf: "JavaObject" = jdf -self.is_cached = False -# initialized lazily -self._schema: Optional[StructType] = None -self._lazy_rdd: Optional["RDD[Row]"] = None -# Check whether _repr_html is supported or not, we use it to avoid calling _jdf twice -# by __repr__ and _repr_html_ while eager evaluation opens. -self._support_repr_html = False - -@property -def sql_ctx(self) -> "SQLContext": -from pyspark.sql.context import SQLContext +) -> "DataFrame": +from pyspark.sql.classic.dataframe import DataFrame -warnings.warn( -"DataFrame.sql_ctx is an internal property, and will be removed " -"in future releases. Use DataFrame.sparkSession instead." -) -if self._sql_ctx is None: -self._sql_ctx = SQLContext._get_or_create(self._sc) -return self._sql_ctx +return DataFrame.__new__(DataFrame, jdf, sql_ctx) @property +@dispatch_df_method Review Comment: The dispatch for `property` seems not working. ```py >>> class A: ... @property ... @dispatch_df_method ... def a(self): ... return 1 >>> >>> a = A() >>> A.a(a) Traceback (most recent call last): File "", line 1, in TypeError: 'property' object is not callable ``` I don't think we need this for `property` as this usage of `property` won't work anyway? ## python/pyspark/sql/classic/dataframe.py: ## @@ -0,0 +1,1974 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import json +import sys +import random +import warnings +from collections.abc import Iterable +from functools import reduce +from typing import ( +Any, +Callable, +Dict, +Iterator, +List, +Optional, +Sequence, +Tuple, +Type, +Union, +cast, +overload, +TYPE_CHECKING, +) + +from pyspark import _NoValue +from pyspark.resource import ResourceProfile +from pyspark._globals import _NoValueType +from pyspark.errors import ( +PySparkTypeError, +PySparkValueError, +PySparkIndexError, +PySparkAttributeError, +) +from pyspark.util import ( +is_remote_only, +_load_from_socket, +_local_iterator_from_socket, +) +from pyspark.serializers import BatchedSerializer, CPickleSerializer, UTF8Deserializer +from pyspark.storagelevel import
Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]
allisonwang-db commented on PR #46140: URL: https://github.com/apache/spark/pull/46140#issuecomment-2067273091 cc @jasonli-db @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]
allisonwang-db opened a new pull request, #46140: URL: https://github.com/apache/spark/pull/46140 ### What changes were proposed in this pull request? This PR fixes a bug in the ExecuteJobTag creation in ExecuteHolder. The sessionId and userId are reversed. ### Why are the changes needed? To fix a bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1572915019 ## python/pyspark/sql/datasource.py: ## @@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStream message_parameters={"feature": "streamWriter"}, ) +def _streamReader(self, schema: StructType) -> "DataSourceStreamReader": +try: +return self.streamReader(schema=schema) +except PySparkNotImplementedError: +return _SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema)) Review Comment: Make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1572914583 ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): +def __init__(self, start: dict, end: dict, it: Iterator[Tuple]): +self.start = start +self.end = end +self.it = it + + +class SimpleDataSourceStreamReader(ABC): +""" +A base class for simplified streaming data source readers. +Compared to :class:`DataSourceStreamReader`, :class:`SimpleDataSourceStreamReader` doesn't +require planning data partition. Also, the read api of :class:`SimpleDataSourceStreamReader` +allows reading data and planning the latest offset at the same time. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: +""" +Read all available data from start offset and return the offset that next read attempt +starts from. + +Parameters +-- +start : dict +The start offset to start reading from. + +Returns +--- +A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s +The iterator contains all the available records after start offset. +The dict is the end offset of this read attempt and the start of next read attempt. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read"}, +) + +def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: +""" +Read all available data from specific start offset and end offset. +This is invoked during failure recovery to re-read a batch deterministically +in order to achieve exactly once. + +Parameters +-- +start : dict +The start offset to start reading from. + +end : dict +The offset where the reading stop. + +Returns +--- +iterator of :class:`Tuple`\\s +All the records between start offset and end offset. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read2"}, +) + +def commit(self, end: dict) -> None: +""" +Informs the source that Spark has completed processing all data for offsets less than or +equal to `end` and will only request offsets greater than `end` in the future. + +Parameters +-- +end : dict +The latest offset that the streaming query has processed for this source. +""" +... + + +class _SimpleStreamReaderWrapper(DataSourceStreamReader): +""" +A private class that wrap :class:`SimpleDataSourceStreamReader` in prefetch and cache pattern, +so that :class:`SimpleDataSourceStreamReader` can integrate with streaming engine like an +ordinary :class:`DataSourceStreamReader`. + +current_offset tracks the latest progress of the record prefetching, it is initialized to be +initialOffset() when query start for the first time or initialized to be the end offset of +the last committed batch when query restarts. + +When streaming engine calls latestOffset(), the wrapper calls read() that starts from +current_offset, prefetches and cache the data, then updates the current_offset to be +the end offset of the new data. + +When streaming engine call planInputPartitions(start, end), the wrapper get the prefetched data +from cache and send it to JVM along with the input partitions. + +When query restart, batches in write ahead offset log that has not been committed will be +replayed by reading data between start and end offset through read2(start,
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
allisonwang-db commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1572908487 ## python/pyspark/sql/datasource.py: ## @@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStream message_parameters={"feature": "streamWriter"}, ) +def _streamReader(self, schema: StructType) -> "DataSourceStreamReader": +try: +return self.streamReader(schema=schema) +except PySparkNotImplementedError: +return _SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema)) + +def simpleStreamReader(self, schema: StructType) -> "SimpleDataSourceStreamReader": +""" +Returns a :class:`SimpleDataSourceStreamReader` instance for reading data. + +One of simpleStreamReader() and streamReader() must be implemented for readable streaming +data source. Review Comment: Can we be more explicit about when users should choose streamReader versus simpleStreamReader here? This information will be included in the API documentation for this class. ## python/pyspark/sql/worker/plan_data_source_read.py: ## @@ -51,6 +52,71 @@ ) +def records_to_arrow_batches( +output_iter: Iterator[Tuple], +max_arrow_batch_size: int, +return_type: StructType, +data_source: DataSource, +) -> Iterable[pa.RecordBatch]: Review Comment: Let's add some docstring for this function. ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): Review Comment: Why do we need this in the public API? Why can't user define their own input partition class? ## python/pyspark/sql/datasource.py: ## @@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStream message_parameters={"feature": "streamWriter"}, ) +def _streamReader(self, schema: StructType) -> "DataSourceStreamReader": Review Comment: Why do we need this `_streamReader` in datasource API? ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): +def __init__(self, start: dict, end: dict, it: Iterator[Tuple]): +self.start = start +self.end = end +self.it = it + + +class SimpleDataSourceStreamReader(ABC): +""" +A base class for simplified streaming data source readers. +Compared to :class:`DataSourceStreamReader`, :class:`SimpleDataSourceStreamReader` doesn't +require planning data partition. Also, the read api of :class:`SimpleDataSourceStreamReader` +allows reading data and planning the latest offset at the same time. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: +""" +Read all available data from start offset and return the offset that next read attempt +starts from. + +Parameters +-- +start : dict +The start offset to start reading from. + +Returns +--- +A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s +The iterator contains all the available records after start offset. +The dict is the end offset of this read attempt and the start of next read attempt. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read"}, +) + +def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: +""" +Read all available data from specific start offset and end offset. +This is invoked during failure recovery to re-read a batch deterministically +in order to achieve exactly once. +
[PR] [SPARK-47920] add doc for python streaming data source API [spark]
chaoqin-li1123 opened a new pull request, #46139: URL: https://github.com/apache/spark/pull/46139 ### What changes were proposed in this pull request? add doc for python streaming data source API ### Why are the changes needed? Add user guide to help user develop python streaming data source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572840033 ## config/checkstyle/checkstyle.xml: ## @@ -0,0 +1,195 @@ + + +https://checkstyle.org/dtds/configuration_1_3.dtd;> + + + + Review Comment: Please make a spin-off PR for `Static Analysis`-related stuff, @jiangzho . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572840033 ## config/checkstyle/checkstyle.xml: ## @@ -0,0 +1,195 @@ + + +https://checkstyle.org/dtds/configuration_1_3.dtd;> + + + + Review Comment: Please make a spin-off PR for `checkstyle`-related stuff, @jiangzho . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572837826 ## build.gradle: ## @@ -1,3 +1,16 @@ +buildscript { + repositories { +maven { + url = uri("https://plugins.gradle.org/m2/;) +} + } + dependencies { +classpath "com.github.spotbugs.snom:spotbugs-gradle-plugin:${spotBugsGradlePluginVersion}" + } +} + +assert JavaVersion.current().isJava11Compatible(): "Java 11 or newer is required" Review Comment: Please drop Java 11. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572836874 ## build-tools/helm/spark-kubernetes-operator/values.yaml: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +image: + repository: spark-kubernetes-operator + pullPolicy: IfNotPresent + # tag: latest + # If image digest is set then it takes precedence and the image tag will be ignored + # digest: "" + +imagePullSecrets: [ ] + +operatorDeployment: + # Replicas must be 1 + replicas: 1 + # Strategy type must be 'Recreate' unless leader election is configured + strategy: +type: Recreate + operatorPod: +priorityClassName: null +annotations: { } +labels: { } +affinity: { } +nodeSelector: { } +# Node tolerations for operator pod assignment +# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/ +tolerations: [ ] +# Topology spread constrains +# https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/ +topologySpreadConstraints: [ ] +operatorContainer: + jvmArgs: "-XX:+UseG1GC -Xms3G -Xmx3G -Dfile.encoding=UTF8" + env: + envFrom: + volumeMounts: { } + resources: +limits: + cpu: "1" + ephemeral-storage: 2Gi + memory: 4Gi +requests: + cpu: "1" + ephemeral-storage: 2Gi + memory: 4Gi + probes: +port: 18080 +livenessProbe: + periodSeconds: 10 + initialDelaySeconds: 30 +startupProbe: + failureThreshold: 30 + periodSeconds: 10 + metrics: +port: 19090 + securityContext: +allowPrivilegeEscalation: false +capabilities: + drop: +- ALL +runAsNonRoot: true +runAsUser: +seccompProfile: + type: RuntimeDefault +additionalContainers: { } +# additionalContainers: +# - name: "" +#image: "" +volumes: { } +# volumes: +# - name: spark-artifacts +# hostPath: +# path: /tmp/spark/artifacts +# type: DirectoryOrCreate +securityContext: { } +dnsPolicy: +dnsConfig: + +operatorRbac: + serviceAccount: +create: true +name: "spark-operator" + # If disabled, a Role would be created inside each app namespace for app operations + clusterRole: +create: true +name: "spark-operator-clusterrole" + # If disabled, a RoleBinding would be created inside each app namespace for app operations + clusterRoleBinding: +create: true +name: "spark-operator-clusterrolebinding" + configManagement: +roleName: "spark-operator-config-role" +roleBindingName: "spark-operator-config-role-binding" + +appResources: + # Create namespace(s), service account(s) and rolebinding(s) for SparkApps, if configured + # Operator would act at cluster level by default if no app namespace(s) are provided + namespaces: +create: true +# When enabled, operator would by default only watch namespace(s) provided in data field +watchGivenNamespacesOnly: false +data: + #- "spark-demo" + #- "spark-demo" Review Comment: May I ask why we have the same comment twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572836113 ## build-tools/helm/spark-kubernetes-operator/values.yaml: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +image: + repository: spark-kubernetes-operator + pullPolicy: IfNotPresent + # tag: latest + # If image digest is set then it takes precedence and the image tag will be ignored + # digest: "" + +imagePullSecrets: [ ] + +operatorDeployment: + # Replicas must be 1 + replicas: 1 + # Strategy type must be 'Recreate' unless leader election is configured + strategy: +type: Recreate + operatorPod: +priorityClassName: null +annotations: { } +labels: { } +affinity: { } +nodeSelector: { } +# Node tolerations for operator pod assignment +# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/ +tolerations: [ ] +# Topology spread constrains +# https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/ +topologySpreadConstraints: [ ] +operatorContainer: + jvmArgs: "-XX:+UseG1GC -Xms3G -Xmx3G -Dfile.encoding=UTF8" + env: + envFrom: + volumeMounts: { } + resources: +limits: + cpu: "1" + ephemeral-storage: 2Gi + memory: 4Gi +requests: + cpu: "1" + ephemeral-storage: 2Gi + memory: 4Gi + probes: +port: 18080 +livenessProbe: + periodSeconds: 10 + initialDelaySeconds: 30 +startupProbe: + failureThreshold: 30 + periodSeconds: 10 + metrics: +port: 19090 + securityContext: +allowPrivilegeEscalation: false +capabilities: + drop: +- ALL +runAsNonRoot: true +runAsUser: +seccompProfile: + type: RuntimeDefault +additionalContainers: { } +# additionalContainers: +# - name: "" +#image: "" Review Comment: Please remove 80 ~ 82 because we have line 79. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572834233 ## .gitignore: ## @@ -16,3 +16,30 @@ build dependencies.lock **/dependencies.lock gradle/wrapper/gradle-wrapper.jar + +# Compiled source # +### +*.class +*.dll +*.exe +*.o +*.so +*.pyc + +# Packages # + +*.7z +*.dmg +*.gz +*.iso +*.rar +*.tar +*.zip + +# Logs and databases # +## +*.log Review Comment: This looks too broad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572833180 ## .github/workflows/build_and_test.yml: ## @@ -26,4 +26,20 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: config: .github/.licenserc.yaml - + test_ci: +name: "Test CI" +runs-on: ubuntu-latest +strategy: + matrix: +java-version: [ 11, 17, 21 ] Review Comment: Please remove Java 11 and make a spin-off PR for this `build_and_test.yml`, @jiangzho . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572833649 ## .gitignore: ## @@ -16,3 +16,30 @@ build dependencies.lock **/dependencies.lock gradle/wrapper/gradle-wrapper.jar + +# Compiled source # +### +*.class +*.dll +*.exe +*.o +*.so +*.pyc + +# Packages # + +*.7z Review Comment: For my understanding, where this came from? ## .gitignore: ## @@ -16,3 +16,30 @@ build dependencies.lock **/dependencies.lock gradle/wrapper/gradle-wrapper.jar + +# Compiled source # +### +*.class +*.dll +*.exe +*.o +*.so +*.pyc + +# Packages # + +*.7z +*.dmg +*.gz +*.iso +*.rar Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on PR #46129: URL: https://github.com/apache/spark/pull/46129#issuecomment-2067140476 Will fix up the tests soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]
sahnib commented on code in PR #45977: URL: https://github.com/apache/spark/pull/45977#discussion_r1572618909 ## python/pyspark/sql/datasource.py: ## @@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: bool) -> "DataSourceStream message_parameters={"feature": "streamWriter"}, ) +def _streamReader(self, schema: StructType) -> "DataSourceStreamReader": +try: +return self.streamReader(schema=schema) +except PySparkNotImplementedError: +return _SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema)) Review Comment: As we prefer `streamReader` over `simpleStreamReader`, can we call out in the docs that `streamReader` will be picked if the user has implemented both functions. ## python/pyspark/sql/datasource.py: ## @@ -469,6 +501,200 @@ def stop(self) -> None: ... +class SimpleInputPartition(InputPartition): +def __init__(self, start: dict, end: dict): +self.start = start +self.end = end + + +class PrefetchedCacheEntry(InputPartition): +def __init__(self, start: dict, end: dict, it: Iterator[Tuple]): +self.start = start +self.end = end +self.it = it + + +class SimpleDataSourceStreamReader(ABC): +""" +A base class for simplified streaming data source readers. +Compared to :class:`DataSourceStreamReader`, :class:`SimpleDataSourceStreamReader` doesn't +require planning data partition. Also, the read api of :class:`SimpleDataSourceStreamReader` +allows reading data and planning the latest offset at the same time. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]: +""" +Read all available data from start offset and return the offset that next read attempt +starts from. + +Parameters +-- +start : dict +The start offset to start reading from. + +Returns +--- +A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s +The iterator contains all the available records after start offset. +The dict is the end offset of this read attempt and the start of next read attempt. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read"}, +) + +def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: +""" +Read all available data from specific start offset and end offset. +This is invoked during failure recovery to re-read a batch deterministically +in order to achieve exactly once. + +Parameters +-- +start : dict +The start offset to start reading from. + +end : dict +The offset where the reading stop. + +Returns +--- +iterator of :class:`Tuple`\\s +All the records between start offset and end offset. +""" +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "read2"}, +) + +def commit(self, end: dict) -> None: +""" +Informs the source that Spark has completed processing all data for offsets less than or +equal to `end` and will only request offsets greater than `end` in the future. + +Parameters +-- +end : dict +The latest offset that the streaming query has processed for this source. +""" +... + + +class _SimpleStreamReaderWrapper(DataSourceStreamReader): +""" +A private class that wrap :class:`SimpleDataSourceStreamReader` in prefetch and cache pattern, +so that :class:`SimpleDataSourceStreamReader` can integrate with streaming engine like an +ordinary :class:`DataSourceStreamReader`. + +current_offset tracks the latest progress of the record prefetching, it is initialized to be +initialOffset() when query
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
ericm-db commented on PR #45991: URL: https://github.com/apache/spark/pull/45991#issuecomment-2067115942 @HeartSaVioR PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
steveloughran commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2067069380 So both those bindings hand off to PathOutputCommitterFactory(), which looks for a committer from the config key mapreduce.outputcommitter.factory.class FileOutputCommitterFactory: classic committer NamedCommitterFactory: class in mapreduce.outputcommitter.named.classname then fallback to mapreduce.outputcommitter.factory.scheme.SCHEMA factory definition. The idea being; you get an fs specific one unless asked for. * the parquet one is there because parquet is fussy about its committer subclasses; that should be reviewed (where?) * and PathOutputCommitProtocol is probably surplus now that spark can use PathOutputCommitter everywhere... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SPARK-47907] Put bang under a config [spark]
srielau opened a new pull request, #46138: URL: https://github.com/apache/spark/pull/46138 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47418][SQL] Add hand-crafted implementations for lowercase unicode-aware contains, startsWith and endsWith and optimize UTF8_BINARY_LCASE [spark]
vladimirg-db commented on code in PR #46082: URL: https://github.com/apache/spark/pull/46082#discussion_r1572665195 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -359,10 +414,97 @@ public boolean startsWith(final UTF8String prefix) { return matchAt(prefix, 0); } + /** + * Checks whether `prefix` is a prefix of `this` in a lowercase unicode-aware manner + * + * This function is written in a way which avoids excessive allocations in case if we work with + * bare ASCII-character strings. + */ + public boolean startsWithInLowerCase(final UTF8String prefix) { +// No way to match sizes of strings for early return, since single grapheme can be expanded +// into several independent ones in lowercase +if (prefix.numBytes == 0) { + return true; +} +if (numBytes == 0) { + return false; +} + +for (var i = 0; i < prefix.numBytes; i++) { Review Comment: I wanted to do something like this, but I think that this simple implementation is better for the next reasons: - We couldn't do something like `matchAt(...)` by using an offset in `this`, because we actually don't know the exact offset (at least without the full conversion to lowercase), since lowercase representation of the unicode string can actually be longer (or shorter) than the original. That's why to check the prefix the loop is forward and to check the suffix the loop is backward - We probably could do a parametrized function, which does something like `startOffset + this.offset + direction * i`, which would work for the loop indexing, but I can't think of the suitable math for the inner `if` from the top of my head. It's probably possible, but would be much more involved, since that code differs significantly in terms of indexing - We could just substitute the loop body with the closure, and pass it as a strategy (since that's what differs), but introducing looping virtual call in an optimization PR wouldn't be reasonable - It reads trivially, which is good for the reader - The copy-paste here is not that large -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45265][SQL] Supporting Hive 4.0 Metastore [spark]
dongjoon-hyun closed pull request #45801: [SPARK-45265][SQL] Supporting Hive 4.0 Metastore URL: https://github.com/apache/spark/pull/45801 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47900] Fix check for implicit (UTF8_BINARY) collation [spark]
stefankandic commented on PR #46116: URL: https://github.com/apache/spark/pull/46116#issuecomment-2066908704 @cloud-fan all checks passing after restarting the failed job -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47418][SQL] Add hand-crafted implementations for lowercase unicode-aware contains, startsWith and endsWith and optimize UTF8_BINARY_LCASE [spark]
dbatomic commented on code in PR #46082: URL: https://github.com/apache/spark/pull/46082#discussion_r1572603566 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -359,10 +414,97 @@ public boolean startsWith(final UTF8String prefix) { return matchAt(prefix, 0); } + /** + * Checks whether `prefix` is a prefix of `this` in a lowercase unicode-aware manner + * + * This function is written in a way which avoids excessive allocations in case if we work with + * bare ASCII-character strings. + */ + public boolean startsWithInLowerCase(final UTF8String prefix) { +// No way to match sizes of strings for early return, since single grapheme can be expanded +// into several independent ones in lowercase +if (prefix.numBytes == 0) { + return true; +} +if (numBytes == 0) { + return false; +} + +for (var i = 0; i < prefix.numBytes; i++) { Review Comment: Would it be possible to share implementation between startsWithInLowerCase and endsWithInLowerCase? E.g. if you were to use two variables (offset in prefix and offset in this) you should be able to share the code between these two. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47902][SQL]Making Compute Current Time* expressions foldable [spark]
dbatomic commented on code in PR #46120: URL: https://github.com/apache/spark/pull/46120#discussion_r1572580087 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala: ## @@ -811,4 +811,10 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer "This should have been converted during analysis.")) ) } + + test("check that current time is foldable") { +val rnd = + Rand(Cast(UnixTimestamp(CurrentDate(), Literal("-MM-dd HH:mm:ss")), IntegerType)) Review Comment: oh, yes, sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]
dongjoon-hyun commented on PR #46019: URL: https://github.com/apache/spark/pull/46019#issuecomment-2066830650 Merged to branch-3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]
dongjoon-hyun commented on PR #46019: URL: https://github.com/apache/spark/pull/46019#issuecomment-2066829697 Thank you all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]
dongjoon-hyun closed pull request #46019: [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated URL: https://github.com/apache/spark/pull/46019 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46632][SQL] Fix subexpression elimination when equivalent ternary expressions have different children [spark]
peter-toth commented on code in PR #46135: URL: https://github.com/apache/spark/pull/46135#discussion_r1572557495 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala: ## @@ -494,6 +494,18 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel checkShortcut(Or(equal, Literal(true)), 1) checkShortcut(Not(And(equal, Literal(false))), 1) } + + test("Equivalent ternary expressions have different children") { Review Comment: Hmm, this is because `((1 + 2) + 3)` semanticEquals to `((3 + 1) + 2)` but their children are different. So when we compute the common elements between the above 2 here: https://github.com/apache/spark/blob/0d553d06fe2f05571531ada0f08c0cc45418e941/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L112-L121 only `((1 + 2) + 3)` remains in `localEquivalenceMap` but it's children don't. So later when we want to remove `((1 + 2) + 3)` we can't find its children... I think we could fix the above code that computes `localEquivalenceMap`, but the change in PR is simpler and doesn't seem to do any harm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47828][CONNECT][PYTHON][3.4] DataFrameWriterV2.overwrite fails with invalid plan [spark]
dongjoon-hyun closed pull request #46051: [SPARK-47828][CONNECT][PYTHON][3.4] DataFrameWriterV2.overwrite fails with invalid plan URL: https://github.com/apache/spark/pull/46051 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 [spark]
dongjoon-hyun closed pull request #46137: [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 URL: https://github.com/apache/spark/pull/46137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47902][SQL]Making Compute Current Time* expressions foldable [spark]
cloud-fan commented on code in PR #46120: URL: https://github.com/apache/spark/pull/46120#discussion_r1572478404 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala: ## @@ -811,4 +811,10 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer "This should have been converted during analysis.")) ) } + + test("check that current time is foldable") { +val rnd = + Rand(Cast(UnixTimestamp(CurrentDate(), Literal("-MM-dd HH:mm:ss")), IntegerType)) Review Comment: int: we can simply do `Month(CurrentDate())` to turn date into int. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-47913][CORE][SQL][DSTREAMS][TESTS] Move `src/test/java/test/*` to `src/test/java/*` [spark]
panbingkun closed pull request #46134: [WIP][SPARK-47913][CORE][SQL][DSTREAMS][TESTS] Move `src/test/java/test/*` to `src/test/java/*` URL: https://github.com/apache/spark/pull/46134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
chenhao-db commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1572455657 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala: ## @@ -2485,6 +2485,30 @@ class PlanGenerationTestSuite fn.to_json(fn.col("d"), Map(("timestampFormat", "dd/MM/"))) } + functionTest("parse_json") { Review Comment: Done. Thanks for pointing out! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
nikolamand-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1572379229 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types._ + +class CollationRegexpExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { Review Comment: Missed that `Expression`s. Maybe even better to give them the same name, this is kind of confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
uros-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1572369335 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types._ + +class CollationRegexpExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { Review Comment: well that is a different kind of suite (SparkFunSuite), used for sql tests so I put it in the sql/ package whereas this one is for unit testing (QueryTest), and has to be in the expressions/ package I think they should indeed stay separated btw the names are slightly different: .../expressions/CollationRegexpExpressionSuite.scala sq/CollationRegexpExpressionsSuite.scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47297][SQL] Add collations support to split regex expression [spark]
nikolamand-db closed pull request #45856: [SPARK-47297][SQL] Add collations support to split regex expression URL: https://github.com/apache/spark/pull/45856 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47297][SQL] Add collations support to split regex expression [spark]
nikolamand-db commented on PR #45856: URL: https://github.com/apache/spark/pull/45856#issuecomment-2066549783 Closing as we have new approach for all regex functions https://github.com/apache/spark/pull/46077. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
nikolamand-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1572342093 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types._ + +class CollationRegexpExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { + + test("Like/ILike/RLike expressions with collated strings") { +case class LikeTestCase[R](l: String, regexLike: String, regexRLike: String, collation: String, + expectedLike: R, expectedILike: R, expectedRLike: R) +val testCases = Seq( + LikeTestCase("AbC", "%AbC%", ".b.", "UTF8_BINARY", true, true, true), + LikeTestCase("AbC", "%ABC%", ".B.", "UTF8_BINARY", false, true, false), + LikeTestCase("AbC", "%abc%", ".b.", "UTF8_BINARY_LCASE", true, true, true), + LikeTestCase("", "", "", "UTF8_BINARY_LCASE", true, true, true), + LikeTestCase("Foo", "", "", "UTF8_BINARY_LCASE", false, false, true), + LikeTestCase("", "%foo%", ".o.", "UTF8_BINARY_LCASE", false, false, false), + LikeTestCase("AbC", "%ABC%", ".B.", "UNICODE", false, true, false), + LikeTestCase(null, "%foo%", ".o.", "UNICODE", null, null, null), + LikeTestCase("Foo", null, null, "UNICODE", null, null, null), + LikeTestCase(null, null, null, "UNICODE", null, null, null) +) +testCases.foreach(t => { + // Like + checkEvaluation(Like( +Literal.create(t.l, StringType(CollationFactory.collationNameToId(t.collation))), +Literal.create(t.regexLike, StringType), '\\'), t.expectedLike) + // ILike + checkEvaluation(ILike( +Literal.create(t.l, StringType(CollationFactory.collationNameToId(t.collation))), +Literal.create(t.regexLike, StringType), '\\').replacement, t.expectedILike) + // RLike + checkEvaluation(RLike( +Literal.create(t.l, StringType(CollationFactory.collationNameToId(t.collation))), +Literal.create(t.regexRLike, StringType)), t.expectedRLike) +}) + } + + test("StringSplit expression with collated strings") { +case class StringSplitTestCase[R](s: String, r: String, collation: String, expected: R) +val testCases = Seq( + StringSplitTestCase("1A2B3C", "[ABC]", "UTF8_BINARY", Seq("1", "2", "3", "")), + StringSplitTestCase("1A2B3C", "[abc]", "UTF8_BINARY", Seq("1A2B3C")), + StringSplitTestCase("1A2B3C", "[ABC]", "UTF8_BINARY_LCASE", Seq("1", "2", "3", "")), + StringSplitTestCase("1A2B3C", "[abc]", "UTF8_BINARY_LCASE", Seq("1", "2", "3", "")), + StringSplitTestCase("1A2B3C", "[1-9]+", "UNICODE", Seq("", "A", "B", "C")), + StringSplitTestCase("", "", "UNICODE", Seq("")), + StringSplitTestCase("1A2B3C", "", "UNICODE", Seq("1", "A", "2", "B", "3", "C")), + StringSplitTestCase("", "[1-9]+", "UNICODE", Seq("")), + StringSplitTestCase(null, "[1-9]+", "UNICODE", null), + StringSplitTestCase("1A2B3C", null, "UNICODE", null), + StringSplitTestCase(null, null, "UNICODE", null) +) +testCases.foreach(t => { + // StringSplit + checkEvaluation(StringSplit( +Literal.create(t.s, StringType(CollationFactory.collationNameToId(t.collation))), +Literal.create(t.r, StringType), -1), t.expected) +}) + } + + test("Regexp expressions with collated strings") { +case class RegexpTestCase[R](l: String, r: String, collation: String, + expectedExtract: R, expectedExtractAll: R, expectedCount: R) Review Comment: ditto ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
xupefei commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1572265083 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala: ## @@ -27,18 +27,22 @@ import org.apache.arrow.vector.ipc.message.{ArrowMessage, ArrowRecordBatch} import org.apache.arrow.vector.types.pojo import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.ExecutePlanResponse.ObservedMetrics +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, UnboundRowEncoder} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator} -import org.apache.spark.sql.connect.common.DataTypeProtoConverter +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, LiteralValueProtoConverter} import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ArrowUtils private[sql] class SparkResult[T]( responses: CloseableIterator[proto.ExecutePlanResponse], allocator: BufferAllocator, encoder: AgnosticEncoder[T], -timeZoneId: String) +timeZoneId: String, +setObservationMetricsOpt: Option[(Long, Option[Map[String, Any]]) => Unit] = None) Review Comment: True. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
xupefei commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1572264833 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { + if (observationRegistry.contains(planId)) { +throw new IllegalArgumentException("An Observation can be used with a Dataset only once") + } + observationRegistry.put(planId, observation) +} + } + + private[sql] def setMetricsAndUnregisterObservation( + planId: Long, + metrics: Option[Map[String, Any]]): Unit = { +observationRegistry.get(planId).map { observation => + if (observation.setMetricsAndNotify(metrics)) { +observationRegistry.remove(planId) Review Comment: But nevertheless we don't need to handle this case in Connect because we look up using Plan Id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
xupefei commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1572263903 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { + if (observationRegistry.contains(planId)) { +throw new IllegalArgumentException("An Observation can be used with a Dataset only once") + } + observationRegistry.put(planId, observation) +} + } + + private[sql] def setMetricsAndUnregisterObservation( + planId: Long, + metrics: Option[Map[String, Any]]): Unit = { +observationRegistry.get(planId).map { observation => + if (observation.setMetricsAndNotify(metrics)) { +observationRegistry.remove(planId) Review Comment: I looked at the code. It seems it's valid to check for non-empty metrics in Spark Core: ```scala private[spark] def onFinish(qe: QueryExecution): Unit = { ... val row: Option[Row] = qe.observedMetrics.get(name) val metrics: Option[Map[String, Any]] = row.map(r => r.getValuesMap[Any](r.schema.fieldNames.toImmutableArraySeq)) if (setMetricsAndNotify(metrics)) { unregister() } } ``` The option is for a case when the query finishes without metric. Is this possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]
uros-db commented on code in PR #46041: URL: https://github.com/apache/spark/pull/46041#discussion_r1572231615 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala: ## @@ -54,7 +54,7 @@ object CollationTypeCasts extends TypeCoercionRule { case otherExpr @ ( _: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _: Greatest | _: Least | - _: Coalesce | _: BinaryExpression | _: ConcatWs) => + _: Coalesce | _: BinaryExpression | _: ConcatWs | _: StringLPad | _: StringRPad) => Review Comment: unfortunately, CollationTypeCasts doesn't quite do what you would expect based on those descriptions - we're working on fixing that but if you were to run this: ``` test("Support StringLPad string expressions with collation") { val query = "SELECT lpad('abc', collate('5', 'unicode_ci'), ' ')" checkAnswer(sql(query), Row(" abc")) assert(sql(query).schema.fields.head.dataType.sameType(StringType(3))) } ``` you would find that resulting datatype is UNICODE_CI (instead of UTF8_BINARY which should be the correct type) the reason for this behaviour is found in the ordering of rules in `TypeCoercion` - here you'll find that for example `CollationTypeCasts` comes before `FunctionArgumentConversion`, so in the above example collation is taken from '5' before it's converted into a 5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]
uros-db commented on code in PR #46041: URL: https://github.com/apache/spark/pull/46041#discussion_r1572232587 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala: ## @@ -54,7 +54,7 @@ object CollationTypeCasts extends TypeCoercionRule { case otherExpr @ ( _: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _: Greatest | _: Least | - _: Coalesce | _: BinaryExpression | _: ConcatWs) => + _: Coalesce | _: BinaryExpression | _: ConcatWs | _: StringLPad | _: StringRPad) => Review Comment: so for now, we should only call `collateToSingleType` on correct parameters - in this case: first and third -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 [spark]
bjornjorgensen commented on PR #46137: URL: https://github.com/apache/spark/pull/46137#issuecomment-2066277141 ![image](https://github.com/apache/spark/assets/47577197/a166f540-9254-4614-af83-6848206e6643) @dongjoon-hyun FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
xupefei commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1572148735 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { + if (observationRegistry.contains(planId)) { +throw new IllegalArgumentException("An Observation can be used with a Dataset only once") + } + observationRegistry.put(planId, observation) +} + } + + private[sql] def setMetricsAndUnregisterObservation( + planId: Long, + metrics: Option[Map[String, Any]]): Unit = { +observationRegistry.get(planId).map { observation => + if (observation.setMetricsAndNotify(metrics)) { +observationRegistry.remove(planId) Review Comment: I had the same question when I looked at the code. In Spark Core we only de-register the Observation when some non-empty metrics are set. I am not sure under which circumstance the metrics can be empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
xupefei commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1572148735 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { + if (observationRegistry.contains(planId)) { +throw new IllegalArgumentException("An Observation can be used with a Dataset only once") + } + observationRegistry.put(planId, observation) +} + } + + private[sql] def setMetricsAndUnregisterObservation( + planId: Long, + metrics: Option[Map[String, Any]]): Unit = { +observationRegistry.get(planId).map { observation => + if (observation.setMetricsAndNotify(metrics)) { +observationRegistry.remove(planId) Review Comment: I had the same question when I looked at the code. In Spark Core we only de-register the Observation when some non-empty metrics are set, so I decide to keep it the same in Connect. I am not sure under which circumstance the metrics can be empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 [spark]
bjornjorgensen opened a new pull request, #46137: URL: https://github.com/apache/spark/pull/46137 ### What changes were proposed in this pull request? Upgrade `kubernetes-client` from 6.12.0 to 6.12.1 ### Why are the changes needed? [Release notes](https://github.com/fabric8io/kubernetes-client/releases/tag/v6.12.1) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
xupefei commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1572146508 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { Review Comment: Agreed. I changed it to a concurrent hash map. ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { Review Comment: Agreed. I changed it to a concurrent hashmap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]
GideonPotok commented on code in PR #46041: URL: https://github.com/apache/spark/pull/46041#discussion_r1572135059 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala: ## @@ -54,7 +54,7 @@ object CollationTypeCasts extends TypeCoercionRule { case otherExpr @ ( _: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _: Greatest | _: Least | - _: Coalesce | _: BinaryExpression | _: ConcatWs) => + _: Coalesce | _: BinaryExpression | _: ConcatWs | _: StringLPad | _: StringRPad) => Review Comment: @uros-db That is not so, in my understanding. `collateToSingleType` calls `getOutputCollation` and then does `exprs.map(e => castStringType(e, st).getOrElse(e))` `getOutputCollation`, according to the function comment blocks (verified by me looking at the code), "will only be affected by collated StringTypes or complex DataTypes with collated StringTypes (e.g. ArrayType)" `castStringType` , according to the function comment blocks (verified by me looking at the code), "casts given expression to collated StringType with id equal to collationId only if expression has StringType in the first place." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47906][PYTHON][DOCS] Fix docstring and type hint of `hll_union_agg` [spark]
zhengruifeng commented on PR #46128: URL: https://github.com/apache/spark/pull/46128#issuecomment-2066182638 thanks @HyukjinKwon merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47906][PYTHON][DOCS] Fix docstring and type hint of `hll_union_agg` [spark]
zhengruifeng closed pull request #46128: [SPARK-47906][PYTHON][DOCS] Fix docstring and type hint of `hll_union_agg` URL: https://github.com/apache/spark/pull/46128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46632][SQL] Fix subexpression elimination when equivalent ternary expressions have different children [spark]
zml1206 commented on PR #46135: URL: https://github.com/apache/spark/pull/46135#issuecomment-2066175301 cc @peter-toth @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47914][SQL] Do not display the splits parameter in Rang [spark]
guixiaowen opened a new pull request, #46136: URL: https://github.com/apache/spark/pull/46136 ### What changes were proposed in this pull request? [SQL] explain extended select * from range(0, 4); Before this pr, the split is also displayed in the logical execution phase as None, if it is not be set. ` plan == Parsed Logical Plan == 'Project [*] +- 'UnresolvedTableValuedFunction [range], [0, 4] == Analyzed Logical Plan == id: bigint Project [id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L) +- Range (0, 4, step=1, splits=None) == Optimized Logical Plan == Range (0, 4, step=1, splits=None) == Physical Plan == *(1) Range (0, 4, step=1, splits=1)` After this pr, the split will not be displayed in the logical execution phase , if it is not set. At the same time, it will be be displayed when it is be set. ` plan == Parsed Logical Plan == 'Project [*] +- 'UnresolvedTableValuedFunction [range], [0, 4] == Analyzed Logical Plan == id: bigint Project [id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L) +- Range (0, 4, step=1) == Optimized Logical Plan == Range (0, 4, step=1) == Physical Plan == *(1) Range (0, 4, step=1, splits=1)` ### Why are the changes needed? If the split is not be set. it is also displayed in the logical execution phase as None, which is not very user-friendly. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47911][SQL] Introduces a universal BinaryFormatter to make binary output consistent [spark]
yaooqinn commented on code in PR #46133: URL: https://github.com/apache/spark/pull/46133#discussion_r1572038458 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala: ## @@ -414,3 +413,24 @@ trait ToStringBase { self: UnaryExpression with TimeZoneAwareExpression => """.stripMargin } } + +object ToStringBase { + type BinaryFormatter = Array[Byte] => UTF8String + + def getBinaryFormatter: BinaryFormatter = { +val style = SQLConf.get.getConf(SQLConf.BINARY_OUTPUT_STYLE) +BinaryOutputStyle.withName(style) match { + case BinaryOutputStyle.UTF8 => +array => UTF8String.fromBytes(array) + case BinaryOutputStyle.BASIC => array => +UTF8String.fromString(array.mkString("[", ", ", "]")) + case BinaryOutputStyle.BASE64 => array => + UTF8String.fromString(java.util.Base64.getEncoder.withoutPadding().encodeToString(array)) Review Comment: https://github.com/apache/hive/pull/1261/files#diff-107090e9a0306b19931b1b61f076c2a66afa7cfb1e49fea8624ef950ceb170d0R167 As we have already support hive-service-rpc 4.0.0, I added this style too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Fix subexpression elimination when equivalent ternary expressions have different children [spark]
zml1206 opened a new pull request, #46135: URL: https://github.com/apache/spark/pull/46135 ### What changes were proposed in this pull request? Remove unexpected exception thrown in `EquivalentExpressions.updateExprInMap()`. Equivalent expressions may contain different children, it should happen expression not in map and useCount is -1. For example, before this PR will throw IllegalStateException ``` Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c") .selectExpr("case when a + b + c>3 then 1 when c + a + b>0 then 2 else 0 end as d").show() ``` ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test, before this PR will throw IllegalStateException: *** with use count: -1 ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] Move `src/test/java/test/*` to `src/test/java/*` [spark]
panbingkun opened a new pull request, #46134: URL: https://github.com/apache/spark/pull/46134 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on code in PR #46129: URL: https://github.com/apache/spark/pull/46129#discussion_r1572006465 ## python/pyspark/sql/connect/dataframe.py: ## @@ -2306,7 +2183,7 @@ def _test() -> None: ) (failure_count, test_count) = doctest.testmod( -pyspark.sql.connect.dataframe, +pyspark.sql.dataframe, Review Comment: It does inherit the docstrings but `doctest` cannot. So I manually switch it to `pyspark.sql.dataframe` to run the doctest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DRAFT][SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
uros-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1572003918 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationExpressionSuite.scala: ## @@ -161,4 +162,40 @@ class CollationExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(ArrayExcept(left, right), out) } } + + test("MultiLikeBase regexp expressions with collated strings") { Review Comment: Agreed. Will definitely need to move this somewhere -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org