[PR] [WIP][SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework [spark]
panbingkun opened a new pull request, #46130: URL: https://github.com/apache/spark/pull/46130 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
chenhao-db commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571848958 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6975,6 +6975,71 @@ object functions { */ def parse_json(json: Column): Column = Column.fn("parse_json", json) + /** + * Check if a variant value is a variant null. Returns true if and only if the input is a + * variant null and false otherwise (including in the case of SQL NULL). + * + * @param v + * a variant column. + * @group variant_funcs Review Comment: Added. Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon commented on PR #46129: URL: https://github.com/apache/spark/pull/46129#issuecomment-2065807249 cc @ueshin @zhengruifeng @allisonwang-db @xinrong-meng @itholic @hvanhovell @grundprinzip -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]
HyukjinKwon opened a new pull request, #46129: URL: https://github.com/apache/spark/pull/46129 ### What changes were proposed in this pull request? This PR proposes to have a parent `pyspark.sql.DataFrame` class which `pyspark.sql.connect.dataframe.DataFrame` and `pyspark.sql.classic.dataframe.DataFrame` inherit. **Before** 1. `pyspark.sql.DataFrame` (Spark Claasic) - docstrings - Spark Classic logic 2. `pyspark.sql.connect.dataframe.DataFrame` (Spark Connect) - Spark Connect logic 3. Users can only see the type hints from `pyspark.sql.DataFrame`. **After** 1. `pyspark.sql.DataFrame` (Common) - docstrings - Support classmethod usages (dispatch to either Spark Connect or Spark Classic) 2. `pyspark.sql.classic.dataframe.DataFrame` (Spark Classic) - Spark Connect logic 3. `pyspark.sql.connect.dataframe.DataFrame` (Spark Connect) - Spark Connect logic 4. Users can only see the type hints from `pyspark.sql.DataFrame`. ### Why are the changes needed? This fixes two issues from the current structure at Spark Connect: 1. Support usage of regular methods as class methods, e.g., ```python from pyspark.sql import DataFrame df = spark.range(10) DataFrame.union(df, df) ``` **Before** ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/dataframe.py", line 4809, in union return DataFrame(self._jdf.union(other._jdf), self.sparkSession) ^ File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1724, in __getattr__ raise PySparkAttributeError( pyspark.errors.exceptions.base.PySparkAttributeError: [JVM_ATTRIBUTE_NOT_SUPPORTED] Attribute `_jdf` is not supported in Spark Connect as it depends on the JVM. If you need to use this attribute, do not use Spark Connect when creating your session. Visit https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession for creating regular Spark Session in detail. ``` **After** ``` DataFrame[id: bigint] ``` 2. Supports `isinstance` call ```python from pyspark.sql import DataFrame isinstance(spark.range(1), DataFrame) ``` **Before** ``` False ``` **After** ``` True ``` ### Does this PR introduce _any_ user-facing change? Yes, as described above. ### How was this patch tested? Manually tested, and CI should verify them. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
LuciferYang commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571817704 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6975,6 +6975,71 @@ object functions { */ def parse_json(json: Column): Column = Column.fn("parse_json", json) + /** + * Check if a variant value is a variant null. Returns true if and only if the input is a + * variant null and false otherwise (including in the case of SQL NULL). + * + * @param v + * a variant column. + * @group variant_funcs Review Comment: We should add relevant test cases in PlanGenerationTestSuite for `connect` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47901][BUILD] Upgrade common-text 1.12.0 [spark]
LuciferYang commented on PR #46127: URL: https://github.com/apache/spark/pull/46127#issuecomment-2065783803 Merged into master for Spark 4.0, thanks @yaooqinn ~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47901][BUILD] Upgrade common-text 1.12.0 [spark]
LuciferYang closed pull request #46127: [SPARK-47901][BUILD] Upgrade common-text 1.12.0 URL: https://github.com/apache/spark/pull/46127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47887][CONNECT] Remove unused import `spark/connect/common.proto` from `spark/connect/relations.proto` [spark]
LuciferYang commented on PR #46106: URL: https://github.com/apache/spark/pull/46106#issuecomment-2065780193 Thanks @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
chenhao-db commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571796823 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6975,6 +6975,71 @@ object functions { */ def parse_json(json: Column): Column = Column.fn("parse_json", json) + /** + * Check if a variant value is a variant null. Returns true if and only if the input is a + * variant null and false otherwise (including in the case of SQL NULL). + * + * @param v + * a variant column. + * @group variant_funcs Review Comment: Actually, we do have this group: https://github.com/apache/spark/blob/fe47edece059e9189d8500b3c9b3881b44678785/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala#L54. `parse_json` in `functions.py/scala` was incorrectly marked `json_funcs`, I also fix it by the way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
chenhao-db commented on PR #46123: URL: https://github.com/apache/spark/pull/46123#issuecomment-2065768058 @zhengruifeng Thanks! Please take another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
chenhao-db commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571795604 ## python/docs/source/reference/pyspark.sql/functions.rst: ## @@ -533,6 +533,11 @@ JSON Functions json_object_keys json_tuple parse_json +is_variant_null Review Comment: Done. It looks like `functions.py/scala` is not strictly sorted/grouped, so I only make `functions.rst` sorted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework [spark]
gengliangwang closed pull request #46079: [SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework URL: https://github.com/apache/spark/pull/46079 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework [spark]
gengliangwang commented on PR #46079: URL: https://github.com/apache/spark/pull/46079#issuecomment-2065758227 @panbingkun Thanks again for the works! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47596][DSTREAMS] Streaming: Migrate logWarn with variables to structured logging framework [spark]
gengliangwang commented on code in PR #46079: URL: https://github.com/apache/spark/pull/46079#discussion_r1571792309 ## streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala: ## @@ -247,7 +247,8 @@ private[streaming] class ReceivedBlockTracker( true } catch { case NonFatal(e) => - logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) + logWarning(log"Exception thrown while writing record: " + +log"${MDC(RECEIVED_BLOCK_TRACKER_LOG_EVENT, record)} to the WriteAheadLog.", e) Review Comment: Using the class name `RECEIVED_BLOCK_TRACKER_LOG_EVENT` is also fine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47906][PYTHON][DOCS] Fix docstring and type hint of `hll_union_agg` [spark]
zhengruifeng commented on code in PR #46128: URL: https://github.com/apache/spark/pull/46128#discussion_r1571771431 ## python/pyspark/sql/connect/functions/builtin.py: ## @@ -3775,16 +3775,14 @@ def hll_sketch_agg(col: "ColumnOrName", lgConfigK: Optional[Union[int, Column]] hll_sketch_agg.__doc__ = pysparkfuncs.hll_sketch_agg.__doc__ -def hll_union_agg(col: "ColumnOrName", allowDifferentLgConfigK: Optional[bool] = None) -> Column: +def hll_union_agg( +col: "ColumnOrName", +allowDifferentLgConfigK: Optional[Union[bool, Column]] = None, +) -> Column: if allowDifferentLgConfigK is None: return _invoke_function_over_columns("hll_union_agg", col) else: -_allowDifferentLgConfigK = ( -lit(allowDifferentLgConfigK) -if isinstance(allowDifferentLgConfigK, bool) -else allowDifferentLgConfigK -) -return _invoke_function_over_columns("hll_union_agg", col, _allowDifferentLgConfigK) +return _invoke_function_over_columns("hll_union_agg", col, lit(allowDifferentLgConfigK)) Review Comment: function `lit` accepts both `bool` and `Column` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47883][SQL] Make `CollectTailExec.doExecute` lazy with RowQueue [spark]
zhengruifeng commented on PR #46101: URL: https://github.com/apache/spark/pull/46101#issuecomment-2065722707 thanks, merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47883][SQL] Make `CollectTailExec.doExecute` lazy with RowQueue [spark]
zhengruifeng closed pull request #46101: [SPARK-47883][SQL] Make `CollectTailExec.doExecute` lazy with RowQueue URL: https://github.com/apache/spark/pull/46101 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]
zhengruifeng commented on code in PR #46045: URL: https://github.com/apache/spark/pull/46045#discussion_r1571744450 ## python/pyspark/sql/connect/functions/builtin.py: ## @@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column: repeat.__doc__ = pysparkfuncs.repeat.__doc__ -def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column: -return _invoke_function("split", _to_col(str), lit(pattern), lit(limit)) +def split( +str: "ColumnOrName", +pattern: Union[Column, str], +limit: Union["ColumnOrName", int] = -1, +) -> Column: +# work around shadowing of str in the input variable name +from builtins import str as py_str + +if isinstance(pattern, py_str): +_pattern = lit(pattern) +elif isinstance(pattern, Column): +_pattern = pattern +else: +raise PySparkTypeError( +error_class="NOT_COLUMN_OR_STR", +message_parameters={"arg_name": "pattern", "arg_type": type(pattern).__name__}, +) + +limit = lit(limit) if isinstance(limit, int) else _to_col(limit) +return _invoke_function("split", _to_col(str), _pattern, limit) Review Comment: Only a few functions have such check, and most functions don't check the types. We might need to figure out an easy way for type checking. As to this function, let's keep it simpler for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47901][BUILD] Upgrade common-text 1.12.0 [spark]
LuciferYang opened a new pull request, #46127: URL: https://github.com/apache/spark/pull/46127 ### What changes were proposed in this pull request? This pr aims to upgrade Apache common-text from 1.11.0 to 1.12.0 ### Why are the changes needed? The new version bring 2 bug fix: - TEXT-232: WordUtils.containsAllWords?() may throw PatternSyntaxException - TEXT-175: Fix regression for determining whitespace in WordUtils The full release notes as follows: - https://github.com/apache/commons-text/blob/rel/commons-text-1.12.0/RELEASE-NOTES.txt ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
LuciferYang commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2065694541 > > But for the `TestBenchmark` shown in the current pr description, there are some compilation errors when I manually copy it for testing: > > I guess you may be missing `import org.apache.spark.sql.catalyst.dsl.expressions._` Thanks. Could you please add this import to the benchmark code in the PR description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
LuciferYang commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2065694767 Thanks @wForget -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
wForget commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2065691972 > But for the `TestBenchmark` shown in the current pr description, there are some compilation errors when I manually copy it for testing: I guess you may be missing `import org.apache.spark.sql.catalyst.dsl.expressions._` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
yaooqinn closed pull request #46114: [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 URL: https://github.com/apache/spark/pull/46114 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
LuciferYang commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2065687713 Thanks for your fix @wForget But for the `TestBenchmark` shown in the current pr description, there are some compilation errors when I manually copy it for testing: ``` [ERROR] [Error] /spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TestBenchmark.scala:29: type mismatch; found : Int required: String [ERROR] [Error] /spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TestBenchmark.scala:31: type mismatch; found : Int required: String [ERROR] two errors found ``` Could you please correct it in pr description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
yaooqinn commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2065660603 Thank you @wForget, and @dongjoon-hyun @viirya @minyyy @cloud-fan Merged to '3.5.2', '3.4.4' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46935][DOCS] Consolidate error documentation [spark]
cloud-fan closed pull request #44971: [SPARK-46935][DOCS] Consolidate error documentation URL: https://github.com/apache/spark/pull/44971 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46935][DOCS] Consolidate error documentation [spark]
cloud-fan commented on PR #44971: URL: https://github.com/apache/spark/pull/44971#issuecomment-2065652709 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47902][SQL]Making Compute Current Time* expressions foldable [spark]
cloud-fan commented on code in PR #46120: URL: https://github.com/apache/spark/pull/46120#discussion_r1571683122 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala: ## @@ -437,6 +437,21 @@ class ConstantFoldingSuite extends PlanTest { Optimize.execute(oneRowScalarSubquery), oneRowScalarSubquery) } + + test("Current time functions are constant folded") { Review Comment: I think what we need is an analyzer test to make sure we can use current_time in functions that require folding input, like `rand`. How about we move the test to `ExpressionTypeCheckingSuite`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
zhengruifeng commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571680929 ## sql/core/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6604,6 +6604,71 @@ object functions { */ def parse_json(json: Column): Column = Column.fn("parse_json", json) + /** + * Check if a variant value is a variant null. Returns true if and only if the input is a + * variant null and false otherwise (including in the case of SQL NULL). + * + * @param v + * a variant column. + * @group variant_funcs Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
zhengruifeng commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571680727 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6975,6 +6975,71 @@ object functions { */ def parse_json(json: Column): Column = Column.fn("parse_json", json) + /** + * Check if a variant value is a variant null. Returns true if and only if the input is a + * variant null and false otherwise (including in the case of SQL NULL). + * + * @param v + * a variant column. + * @group variant_funcs Review Comment: it seems we don't have this `variant_funcs` group -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
zhengruifeng commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571679330 ## python/docs/source/reference/pyspark.sql/functions.rst: ## @@ -533,6 +533,11 @@ JSON Functions json_object_keys json_tuple parse_json +is_variant_null Review Comment: nit: let's sort the function names alphabetically -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
wForget commented on code in PR #46114: URL: https://github.com/apache/spark/pull/46114#discussion_r1571644885 ## sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala: ## @@ -108,12 +108,24 @@ class ExpressionSet protected( newSet } + override def ++(elems: GenTraversableOnce[Expression]): ExpressionSet = { Review Comment: thanks, added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
wForget commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2065622505 > If this is a regression at [SPARK-38836](https://issues.apache.org/jira/browse/SPARK-38836), do we need to to fix this at `branch-3.4`, too, @wForget ? I think it's needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]
jiangzho commented on PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#issuecomment-2065538781 Rebase pick failed due to schedule change - attempted a non fast forward merge commit. I may do a final squash to make the history clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571554473 ## python/pyspark/sql/types.py: ## @@ -1521,6 +1521,18 @@ def toPython(self) -> Any: """ return VariantUtils.to_python(self.value, self.metadata) +def toJson(self, zone_id: str = "UTC") -> Any: +""" +Convert the VariantVal to a JSON string. The zone ID represents the time zone that the +timestamp should be printed in. It is defaulted to UTC. The list of valid zone IDs can be +found here: +https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568 Review Comment: Thanks! Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47858][SPARK-47852][PYTHON][SQL] Refactoring the structure for DataFrame error context [spark]
itholic commented on PR #46063: URL: https://github.com/apache/spark/pull/46063#issuecomment-2065533422 Thanks for spotting and addressing this, @panbingkun ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47858][SPARK-47852][PYTHON][SQL][FOLLOWUP] Fix Python Liner [spark]
itholic commented on PR #46117: URL: https://github.com/apache/spark/pull/46117#issuecomment-2065532991 Late LGTM. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47371][SQL][FOLLOWUP] XML: Stop ignoring CDATA within row tags [spark]
HyukjinKwon commented on PR #46121: URL: https://github.com/apache/spark/pull/46121#issuecomment-2065516318 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47371][SQL][FOLLOWUP] XML: Stop ignoring CDATA within row tags [spark]
HyukjinKwon closed pull request #46121: [SPARK-47371][SQL][FOLLOWUP] XML: Stop ignoring CDATA within row tags URL: https://github.com/apache/spark/pull/46121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]
HyukjinKwon commented on PR #46122: URL: https://github.com/apache/spark/pull/46122#issuecomment-2065515295 Seems fine but I would like @gene-db to sign off -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]
HyukjinKwon commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571540998 ## python/pyspark/sql/types.py: ## @@ -1521,6 +1521,18 @@ def toPython(self) -> Any: """ return VariantUtils.to_python(self.value, self.metadata) +def toJson(self, zone_id: str = "UTC") -> Any: +""" +Convert the VariantVal to a JSON string. The zone ID represents the time zone that the +timestamp should be printed in. It is defaulted to UTC. The list of valid zone IDs can be +found here: +https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568 Review Comment: Let's add a new line here otherwise the sphunx format is broken. Also the http link should better be like: ``` `here < https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568 >`_ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
HyukjinKwon commented on code in PR #46123: URL: https://github.com/apache/spark/pull/46123#discussion_r1571532818 ## python/pyspark/sql/functions/builtin.py: ## @@ -15452,6 +15452,135 @@ def parse_json( return _invoke_function("parse_json", _to_java_column(col)) +@_try_remote_functions +def is_variant_null(v: "ColumnOrName") -> Column: +""" +Check if a variant value is a variant null. Returns true if and only if the input is a variant +null and false otherwise (including in the case of SQL NULL). + +.. versionadded:: 4.0.0 + Review Comment: Can we add `Returns` section (https://numpydoc.readthedocs.io/en/latest/format.html#returns)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs [spark]
HyukjinKwon commented on PR #46124: URL: https://github.com/apache/spark/pull/46124#issuecomment-2065501307 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description [spark]
HyukjinKwon closed pull request #46118: [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description URL: https://github.com/apache/spark/pull/46118 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description [spark]
HyukjinKwon commented on PR #46118: URL: https://github.com/apache/spark/pull/46118#issuecomment-2065501774 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs [spark]
HyukjinKwon closed pull request #46124: [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs URL: https://github.com/apache/spark/pull/46124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]
chenhao-db commented on PR #46123: URL: https://github.com/apache/spark/pull/46123#issuecomment-2065464556 @HyukjinKwon could you help review? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47904][SQL] Preserve case in Avro schema when using enableStableIdentifiersForUnionType [spark]
sadikovi commented on PR #46126: URL: https://github.com/apache/spark/pull/46126#issuecomment-2065446238 cc @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47904][SQL] Preserve case in Avro schema when using enableStableIdentifiersForUnionType [spark]
sadikovi opened a new pull request, #46126: URL: https://github.com/apache/spark/pull/46126 ### What changes were proposed in this pull request? When `enableStableIdentifiersForUnionType` is enabled, all of the types are lowercased which creates a problem when field types are case-sensitive: Union type with fields: ``` Schema.createEnum("myENUM", "", null, List[String]("E1", "e2").asJava), Schema.createRecord("myRecord2", "", null, false, List[Schema.Field](new Schema.Field("F", Schema.create(Type.FLOAT))).asJava) ``` would become ``` struct> ``` but instead should be ``` struct> ``` ### Why are the changes needed? Fixes a bug of lowercasing the field name (the type portion). ### Does this PR introduce _any_ user-facing change? Yes, if a user enables `enableStableIdentifiersForUnionType` and has Union types, all fields will preserve the case. Previously, the field names would be all in lowercase. ### How was this patch tested? I added a test case to verify the new field names. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
WweiL commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1571450947 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -63,8 +63,11 @@ def main(infile: IO, outfile: IO) -> None: spark = spark_connect_session # TODO(SPARK-44461): Enable Process Isolation - -func = worker.read_command(pickle_ser, infile) +read_command_exception = None +try: +func = worker.read_command(pickle_ser, infile) +except Exception as e: +read_command_exception = e write_int(0, outfile) # Indicate successful initialization Review Comment: I would actually do a `write_int(-1, outfile)`, then directly write out the error with `handle_worker_exception`, to indicate an unsuccessful initialization. And in [`StreamingPythonRunner`](https://github.com/apache/spark/blob/add49b3c115f34ab8e693f7e67579292afface4c/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala#L94), on where we receive this "0", we check if that's -1, if so, then read error and throw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
WweiL commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1571450947 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -63,8 +63,11 @@ def main(infile: IO, outfile: IO) -> None: spark = spark_connect_session # TODO(SPARK-44461): Enable Process Isolation - -func = worker.read_command(pickle_ser, infile) +read_command_exception = None +try: +func = worker.read_command(pickle_ser, infile) +except Exception as e: +read_command_exception = e write_int(0, outfile) # Indicate successful initialization Review Comment: I would actually do a `write_int(1, outfile)`, then directly write out the error with `handle_worker_exception`, to indicate an unsuccessful initialization. And in `StreamingForeachBatchHelper`, on where we receive this "0", we check if that's 1, if so, then read error and throw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
WweiL commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1571451873 ## python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py: ## @@ -66,6 +66,26 @@ def func(df, _): q = df.writeStream.foreachBatch(func).start() q.processAllAvailable() +def test_pickling_deserialization_error(self): +class NoUnpickle: +def __reduce__(self): +if isinstance(self, type(None)): +raise TypeError("Cannot unpickle instance of NoUnpickle") +return type(self), () Review Comment: I'm having a hard time parsing this code so what's happening here is, during unpickle, `type(self)`, which is defined in `__repr__` is called, then somehow "self" is None...? And the TypeError is raised? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
WweiL commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1571450947 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -63,8 +63,11 @@ def main(infile: IO, outfile: IO) -> None: spark = spark_connect_session # TODO(SPARK-44461): Enable Process Isolation - -func = worker.read_command(pickle_ser, infile) +read_command_exception = None +try: +func = worker.read_command(pickle_ser, infile) +except Exception as e: +read_command_exception = e write_int(0, outfile) # Indicate successful initialization Review Comment: I would actually do a `write_int(1, outfile)`, then directly write out the error with `handle_worker_exception`, to indicate an unsuccessful initialization. And in `StreamingForeachBatchHelper`, on what we receive this "0", we check if that's 1, if so, then read error and throw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] Testing that error is propagated to user upon deserialization [spark]
ericm-db opened a new pull request, #46125: URL: https://github.com/apache/spark/pull/46125 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs [spark]
WweiL commented on PR #46124: URL: https://github.com/apache/spark/pull/46124#issuecomment-2065396643 @HyukjinKwon Can I get a stamp of this? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs [spark]
WweiL opened a new pull request, #46124: URL: https://github.com/apache/spark/pull/46124 ### What changes were proposed in this pull request? Remove unneeded todos ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PySpark] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571395999 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -223,7 +223,7 @@ public void appendFloat(float f) { public void appendBinary(byte[] binary) { checkCapacity(1 + U32_SIZE + binary.length); -writeBuffer[writePos++] = primitiveHeader(LONG_STR); +writeBuffer[writePos++] = primitiveHeader(BINARY); Review Comment: Should be resolved now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PySpark] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571386993 ## python/pyspark/sql/variant_utils.py: ## @@ -86,19 +88,40 @@ class VariantUtils: DECIMAL8 = 9 # 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed integer. DECIMAL16 = 10 +# Date value. Content is 4-byte little-endian signed integer that represents the number of days +# from the Unix epoch. +DATE = 11 +# Timestamp value. Content is 8-byte little-endian signed integer that represents the number of +# microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It is displayed to users in +# their local time zones and may be displayed differently depending on the execution environment. +TIMESTAMP = 12 +# Timestamp_ntz value. It has the same content as `TIMESTAMP` but should always be interpreted +# as if the local time zone is UTC. +TIMESTAMP_NTZ = 13 +# 4-byte IEEE float. +FLOAT = 14 +# Binary value. The content is (4-byte little-endian unsigned integer representing the binary +# size) + (size bytes of binary content). +BINARY = 15 # Long string value. The content is (4-byte little-endian unsigned integer representing the # string size) + (size bytes of string content). LONG_STR = 16 U32_SIZE = 4 +EPOCH = datetime.datetime(year = 1970, month = 1, day = 1, hour = 0, minute = 0, second = 0, +tzinfo = datetime.timezone.utc) +EPOCH_NTZ = datetime.datetime(year = 1970, month = 1, day = 1, hour = 0, minute = 0, second = 0) + +# The valid zone ids can be found here: +# https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568 @classmethod -def to_json(cls, value: bytes, metadata: bytes) -> str: +def to_json(cls, value: bytes, metadata: bytes, zone_id: str = "UTC") -> str: Review Comment: Yes, `__str__` calls `to_json`. Sure, I'll add a `toJson` method too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PySpark] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571385965 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -223,7 +223,7 @@ public void appendFloat(float f) { public void appendBinary(byte[] binary) { checkCapacity(1 + U32_SIZE + binary.length); -writeBuffer[writePos++] = primitiveHeader(LONG_STR); +writeBuffer[writePos++] = primitiveHeader(BINARY); Review Comment: Yes, I'll try to resolve it. I think I've made a bit of a mess with the master branch in my fork. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PySpark] Add support for remaining scalar types in the PySpark Variant library [spark]
gene-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571377546 ## python/pyspark/sql/variant_utils.py: ## @@ -86,19 +88,40 @@ class VariantUtils: DECIMAL8 = 9 # 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed integer. DECIMAL16 = 10 +# Date value. Content is 4-byte little-endian signed integer that represents the number of days +# from the Unix epoch. +DATE = 11 +# Timestamp value. Content is 8-byte little-endian signed integer that represents the number of +# microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It is displayed to users in +# their local time zones and may be displayed differently depending on the execution environment. +TIMESTAMP = 12 +# Timestamp_ntz value. It has the same content as `TIMESTAMP` but should always be interpreted +# as if the local time zone is UTC. +TIMESTAMP_NTZ = 13 +# 4-byte IEEE float. +FLOAT = 14 +# Binary value. The content is (4-byte little-endian unsigned integer representing the binary +# size) + (size bytes of binary content). +BINARY = 15 # Long string value. The content is (4-byte little-endian unsigned integer representing the # string size) + (size bytes of string content). LONG_STR = 16 U32_SIZE = 4 +EPOCH = datetime.datetime(year = 1970, month = 1, day = 1, hour = 0, minute = 0, second = 0, +tzinfo = datetime.timezone.utc) +EPOCH_NTZ = datetime.datetime(year = 1970, month = 1, day = 1, hour = 0, minute = 0, second = 0) + +# The valid zone ids can be found here: +# https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568 @classmethod -def to_json(cls, value: bytes, metadata: bytes) -> str: +def to_json(cls, value: bytes, metadata: bytes, zone_id: str = "UTC") -> str: """ Convert the VariantVal to a JSON string. Review Comment: Can you update these comments to mention the `zone_id`, and that UTC is the default behavior? ## python/pyspark/sql/variant_utils.py: ## @@ -86,19 +88,40 @@ class VariantUtils: DECIMAL8 = 9 # 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed integer. DECIMAL16 = 10 +# Date value. Content is 4-byte little-endian signed integer that represents the number of days +# from the Unix epoch. +DATE = 11 +# Timestamp value. Content is 8-byte little-endian signed integer that represents the number of +# microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It is displayed to users in +# their local time zones and may be displayed differently depending on the execution environment. +TIMESTAMP = 12 +# Timestamp_ntz value. It has the same content as `TIMESTAMP` but should always be interpreted +# as if the local time zone is UTC. +TIMESTAMP_NTZ = 13 +# 4-byte IEEE float. +FLOAT = 14 +# Binary value. The content is (4-byte little-endian unsigned integer representing the binary +# size) + (size bytes of binary content). +BINARY = 15 # Long string value. The content is (4-byte little-endian unsigned integer representing the # string size) + (size bytes of string content). LONG_STR = 16 U32_SIZE = 4 +EPOCH = datetime.datetime(year = 1970, month = 1, day = 1, hour = 0, minute = 0, second = 0, +tzinfo = datetime.timezone.utc) +EPOCH_NTZ = datetime.datetime(year = 1970, month = 1, day = 1, hour = 0, minute = 0, second = 0) + +# The valid zone ids can be found here: +# https://gist.github.com/heyalexej/8bf688fd67d7199be4a1682b3eec7568 @classmethod -def to_json(cls, value: bytes, metadata: bytes) -> str: +def to_json(cls, value: bytes, metadata: bytes, zone_id: str = "UTC") -> str: Review Comment: This reminds me, in `sql/types.py`, we have a `VariantVal` which calls this `to_json`. However, I think just `__str__` calls `to_json`. Should we add a `toJson()` which can take in an optional `zone_id`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47903][PySpark] Add support for remaining scalar types in the PySpark Variant library [spark]
chenhao-db commented on code in PR #46122: URL: https://github.com/apache/spark/pull/46122#discussion_r1571378396 ## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ## @@ -223,7 +223,7 @@ public void appendFloat(float f) { public void appendBinary(byte[] binary) { checkCapacity(1 + U32_SIZE + binary.length); -writeBuffer[writePos++] = primitiveHeader(LONG_STR); +writeBuffer[writePos++] = primitiveHeader(BINARY); Review Comment: I think this change is already in master? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47903][PySpark] Add support for remaining scalar types in the PySpark Variant library [spark]
harshmotw-db opened a new pull request, #46122: URL: https://github.com/apache/spark/pull/46122 ### What changes were proposed in this pull request? ### Why are the changes needed? Added support for the `date`, `timestamp`, `timestamp_ntz`, `float` and `binary` scalar types to the variant library in Python. Data of these types can also be extracted now from a variant. ### Does this PR introduce _any_ user-facing change? Yes, users can now use PySpark to extract data of more types from Variants. ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47891][PYTHON][DOCS] Improve docstring of mapInPandas [spark]
xinrong-meng commented on PR #46108: URL: https://github.com/apache/spark/pull/46108#issuecomment-2065050296 Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
ericm-db commented on code in PR #45991: URL: https://github.com/apache/spark/pull/45991#discussion_r1571187342 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.time.Duration + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.execution.streaming.{MapStateImplWithTTL, MemoryStream} +import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.util.StreamManualClock + +class MapStateSingleKeyTTLProcessor(ttlConfig: TTLConfig) + extends StatefulProcessor[String, InputEvent, OutputEvent] +with Logging { + + @transient private var _mapState: MapStateImplWithTTL[String, Int] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { +_mapState = getHandle + .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig) + .asInstanceOf[MapStateImplWithTTL[String, Int]] + } + override def handleInputRows( +key: String, +inputRows: Iterator[InputEvent], +timerValues: TimerValues, +expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputEvent] = { +var results = List[OutputEvent]() + +for (row <- inputRows) { + val resultIter = processRow(row, _mapState) + resultIter.foreach { r => +results = r :: results + } +} + +results.iterator + } + + def processRow( + row: InputEvent, + mapState: MapStateImplWithTTL[String, Int]): Iterator[OutputEvent] = { +var results = List[OutputEvent]() +val key = row.key +val userKey = "key" +if (row.action == "get") { + if (mapState.containsKey(userKey)) { +results = OutputEvent(key, mapState.getValue(userKey), isTTLValue = false, -1) :: results + } +} else if (row.action == "get_without_enforcing_ttl") { + val currState = mapState.getWithoutEnforcingTTL(userKey) + if (currState.isDefined) { +results = OutputEvent(key, currState.get, isTTLValue = false, -1) :: results + } +} else if (row.action == "get_ttl_value_from_state") { + val ttlValue = mapState.getTTLValue(userKey) + if (ttlValue.isDefined) { +val value = ttlValue.get._1 +val ttlExpiration = ttlValue.get._2 +results = OutputEvent(key, value, isTTLValue = true, ttlExpiration) :: results + } +} else if (row.action == "put") { + mapState.updateValue(userKey, row.value) +} else if (row.action == "get_values_in_ttl_state") { + val ttlValues = mapState.getValuesInTTLState() + ttlValues.foreach { v => +results = OutputEvent(key, -1, isTTLValue = true, ttlValue = v) :: results + } +} + +results.iterator + } +} + +case class MapInputEvent( +key: String, +userKey: String, +action: String, +value: Int) + +case class MapOutputEvent( +key: String, +userKey: String, +value: Int, +isTTLValue: Boolean, +ttlValue: Long) + +class MapStateTTLProcessor(ttlConfig: TTLConfig) + extends StatefulProcessor[String, MapInputEvent, MapOutputEvent] +with Logging { + + @transient private var _mapState: MapStateImplWithTTL[String, Int] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { +_mapState = getHandle + .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig) + .asInstanceOf[MapStateImplWithTTL[String, Int]] + } + + override def handleInputRows( + key: String, + inputRows: Iterator[MapInputEvent], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[MapOutputEvent] = { +var results = List[MapOutputEvent]() + +for (row <- inputRows) { + val resultIter = processRow(row, _mapState) + resultIter.foreach { r => +results = r :: results + } +} + +results.iterator + } + + def processRow( + row: MapInputEvent, + mapState: MapStateImplWithTTL[String, Int]):
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
anishshri-db commented on code in PR #45991: URL: https://github.com/apache/spark/pull/45991#discussion_r1571185285 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.time.Duration + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.execution.streaming.{MapStateImplWithTTL, MemoryStream} +import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.util.StreamManualClock + +class MapStateSingleKeyTTLProcessor(ttlConfig: TTLConfig) + extends StatefulProcessor[String, InputEvent, OutputEvent] +with Logging { + + @transient private var _mapState: MapStateImplWithTTL[String, Int] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { +_mapState = getHandle + .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig) + .asInstanceOf[MapStateImplWithTTL[String, Int]] + } + override def handleInputRows( +key: String, +inputRows: Iterator[InputEvent], +timerValues: TimerValues, +expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputEvent] = { +var results = List[OutputEvent]() + +for (row <- inputRows) { + val resultIter = processRow(row, _mapState) + resultIter.foreach { r => +results = r :: results + } +} + +results.iterator + } + + def processRow( + row: InputEvent, + mapState: MapStateImplWithTTL[String, Int]): Iterator[OutputEvent] = { +var results = List[OutputEvent]() +val key = row.key +val userKey = "key" +if (row.action == "get") { + if (mapState.containsKey(userKey)) { +results = OutputEvent(key, mapState.getValue(userKey), isTTLValue = false, -1) :: results + } +} else if (row.action == "get_without_enforcing_ttl") { + val currState = mapState.getWithoutEnforcingTTL(userKey) + if (currState.isDefined) { +results = OutputEvent(key, currState.get, isTTLValue = false, -1) :: results + } +} else if (row.action == "get_ttl_value_from_state") { + val ttlValue = mapState.getTTLValue(userKey) + if (ttlValue.isDefined) { +val value = ttlValue.get._1 +val ttlExpiration = ttlValue.get._2 +results = OutputEvent(key, value, isTTLValue = true, ttlExpiration) :: results + } +} else if (row.action == "put") { + mapState.updateValue(userKey, row.value) +} else if (row.action == "get_values_in_ttl_state") { + val ttlValues = mapState.getValuesInTTLState() + ttlValues.foreach { v => +results = OutputEvent(key, -1, isTTLValue = true, ttlValue = v) :: results + } +} + +results.iterator + } +} + +case class MapInputEvent( +key: String, +userKey: String, +action: String, +value: Int) + +case class MapOutputEvent( +key: String, +userKey: String, +value: Int, +isTTLValue: Boolean, +ttlValue: Long) + +class MapStateTTLProcessor(ttlConfig: TTLConfig) + extends StatefulProcessor[String, MapInputEvent, MapOutputEvent] +with Logging { + + @transient private var _mapState: MapStateImplWithTTL[String, Int] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { +_mapState = getHandle + .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig) + .asInstanceOf[MapStateImplWithTTL[String, Int]] + } + + override def handleInputRows( + key: String, + inputRows: Iterator[MapInputEvent], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[MapOutputEvent] = { +var results = List[MapOutputEvent]() + +for (row <- inputRows) { + val resultIter = processRow(row, _mapState) + resultIter.foreach { r => +results = r :: results + } +} + +results.iterator + } + + def processRow( + row: MapInputEvent, + mapState: MapStateImplWithTTL[String, Int]):
Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]
dongjoon-hyun commented on PR #46019: URL: https://github.com/apache/spark/pull/46019#issuecomment-2064652257 Hi, @cloud-fan and @HyukjinKwon . How about the AS-IS status? Could you review this once more when you have a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47898][SQL] Port HIVE-12270: Add DBTokenStore support to HS2 delegation token [spark]
dongjoon-hyun closed pull request #46115: [SPARK-47898][SQL] Port HIVE-12270: Add DBTokenStore support to HS2 delegation token URL: https://github.com/apache/spark/pull/46115 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
hvanhovell commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1571122260 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala: ## @@ -27,18 +27,22 @@ import org.apache.arrow.vector.ipc.message.{ArrowMessage, ArrowRecordBatch} import org.apache.arrow.vector.types.pojo import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.ExecutePlanResponse.ObservedMetrics +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, UnboundRowEncoder} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator} -import org.apache.spark.sql.connect.common.DataTypeProtoConverter +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, LiteralValueProtoConverter} import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ArrowUtils private[sql] class SparkResult[T]( responses: CloseableIterator[proto.ExecutePlanResponse], allocator: BufferAllocator, encoder: AgnosticEncoder[T], -timeZoneId: String) +timeZoneId: String, +setObservationMetricsOpt: Option[(Long, Option[Map[String, Any]]) => Unit] = None) Review Comment: Why does the callback need to handle `None` values? From looking at the code we will only invoke the callback when there is a meaningful result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][TESTS] Replace CONFIG_DIM1 with CONFIG_DIM2 in timestamp tests [spark]
dongjoon-hyun commented on PR #46119: URL: https://github.com/apache/spark/pull/46119#issuecomment-2064638166 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][TESTS] Replace CONFIG_DIM1 with CONFIG_DIM2 in timestamp tests [spark]
dongjoon-hyun closed pull request #46119: [MINOR][TESTS] Replace CONFIG_DIM1 with CONFIG_DIM2 in timestamp tests URL: https://github.com/apache/spark/pull/46119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
hvanhovell commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1571108161 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -1511,6 +1514,46 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM (0 until 5).foreach(i => assert(row.get(i * 2) === row.get(i * 2 + 1))) } } + + test("Observable metrics") { +val df = spark.range(99).withColumn("extra", col("id") - 1) +val ob1 = new Observation("ob1") +val observedDf = df.observe(ob1, min("id"), avg("id"), max("id")) +val observedObservedDf = observedDf.observe("ob2", min("extra"), avg("extra"), max("extra")) + +val ob1Schema = new StructType() + .add("min(id)", LongType) + .add("avg(id)", DoubleType) + .add("max(id)", LongType) +val ob2Schema = new StructType() + .add("min(extra)", LongType) + .add("avg(extra)", DoubleType) + .add("max(extra)", LongType) +val ob1Metrics = Map("ob1" -> new GenericRowWithSchema(Array(0, 49, 98), ob1Schema)) +val ob2Metrics = Map("ob2" -> new GenericRowWithSchema(Array(-1, 48, 97), ob2Schema)) + +assert(df.collectResult().getObservedMetrics === Map.empty) +assert(observedDf.collectResult().getObservedMetrics === ob1Metrics) +assert(observedObservedDf.collectResult().getObservedMetrics === ob1Metrics ++ ob2Metrics) + } + + test("Observation.get is blocked until the query is finished") { +val df = spark.range(99).withColumn("extra", col("id") - 1) +val observation = new Observation("ob1") +val observedDf = df.observe(observation, min("id"), avg("id"), max("id")) + +// Start a new thread to get the observation +val future = Future(observation.get)(ExecutionContext.global) Review Comment: For the record. IMO the observation class should have been using a future from the get go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
hvanhovell commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1571105638 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { + if (observationRegistry.contains(planId)) { +throw new IllegalArgumentException("An Observation can be used with a Dataset only once") + } + observationRegistry.put(planId, observation) +} + } + + private[sql] def setMetricsAndUnregisterObservation( + planId: Long, + metrics: Option[Map[String, Any]]): Unit = { +observationRegistry.get(planId).map { observation => + if (observation.setMetricsAndNotify(metrics)) { +observationRegistry.remove(planId) Review Comment: Should this be tied to whether or not the observation has been successfully updated? Other question under what circumstance can the metrics be empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]
hvanhovell commented on code in PR #45701: URL: https://github.com/apache/spark/pull/45701#discussion_r1571102181 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -813,6 +823,28 @@ class SparkSession private[sql] ( * Set to false to prevent client.releaseSession on close() (testing only) */ private[sql] var releaseSessionOnClose = true + + private[sql] def registerObservation(planId: Long, observation: Observation): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, as it is only allowed to do this once +observation.synchronized { Review Comment: We should either lock the map, or use a concurrent hash map. Locking the observation should only be needed when we set the value, and even then the observation should be responsible for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47889][FOLLOWUP] Add `gradlew` to `.licenserc.yaml` [spark-kubernetes-operator]
viirya commented on PR #5: URL: https://github.com/apache/spark-kubernetes-operator/pull/5#issuecomment-2064537489 looks good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
ericm-db commented on code in PR #45991: URL: https://github.com/apache/spark/pull/45991#discussion_r1571071937 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.{MapState, TTLConfig} +import org.apache.spark.util.NextIterator + +/** + * Class that provides a concrete implementation for map state associated with state + * variables (with ttl expiration support) used in the streaming transformWithState operator. + * @param store - reference to the StateStore instance to be used for storing state + * @param stateName - name of the state variable + * @param keyExprEnc - Spark SQL encoder for key + * @param userKeyEnc - Spark SQL encoder for the map key + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam K - type of key for map state variable + * @tparam V - type of value for map state variable + * @return - instance of MapState of type [K,V] that can be used to store state persistently + */ +class MapStateImplWithTTL[K, V]( +store: StateStore, +stateName: String, +keyExprEnc: ExpressionEncoder[Any], +userKeyEnc: Encoder[K], +valEncoder: Encoder[V], +ttlConfig: TTLConfig, +batchTimestampMs: Long) extends CompositeKeyTTLStateImpl(stateName, store, batchTimestampMs) + with MapState[K, V] with Logging { + + private val keySerializer = keyExprEnc.createSerializer() + private val stateTypesEncoder = new CompositeKeyStateEncoder( +keySerializer, userKeyEnc, valEncoder, COMPOSITE_KEY_ROW_SCHEMA, stateName, hasTtl = true) + + private val ttlExpirationMs = +StateTTL.calculateExpirationTimeForDuration(ttlConfig.ttlDuration, batchTimestampMs) + + initialize() + + private def initialize(): Unit = { +store.createColFamilyIfAbsent(stateName, COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL, + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1)) + } + + /** Whether state exists or not. */ + override def exists(): Boolean = { +iterator().nonEmpty + } + + /** Get the state value if it exists */ + override def getValue(key: K): V = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key) +val retRow = store.get(encodedCompositeKey, stateName) + +if (retRow != null) { + val resState = stateTypesEncoder.decodeValue(retRow) + + if (!stateTypesEncoder.isExpired(retRow, batchTimestampMs)) { +resState + } else { +null.asInstanceOf[V] + } +} else { + null.asInstanceOf[V] +} + } + + /** Check if the user key is contained in the map */ + override def containsKey(key: K): Boolean = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +getValue(key) != null + } + + /** Update value for given user key */ + override def updateValue(key: K, value: V): Unit = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +StateStoreErrors.requireNonNullStateValue(value, stateName) +val encodedValue = stateTypesEncoder.encodeValue(value, ttlExpirationMs) +val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key) +store.put(encodedCompositeKey, encodedValue, stateName) +val serializedGroupingKey = stateTypesEncoder.serializeGroupingKey() +val serializedUserKey = stateTypesEncoder.serializeUserKey(key) +upsertTTLForStateKey(ttlExpirationMs, serializedGroupingKey, serializedUserKey) + } + + /** Get the map associated with grouping key */ + override def iterator(): Iterator[(K, V)] = { +val
Re: [PR] [SPARK-47889][FOLLOWUP] Add `gradlew` to `.licenserc.yaml` [spark-kubernetes-operator]
dongjoon-hyun closed pull request #5: [SPARK-47889][FOLLOWUP] Add `gradlew` to `.licenserc.yaml` URL: https://github.com/apache/spark-kubernetes-operator/pull/5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Operator 1.0.0-alpha [spark-kubernetes-operator]
dongjoon-hyun commented on PR #2: URL: https://github.com/apache/spark-kubernetes-operator/pull/2#issuecomment-2064486661 Please rebase this PR, @jiangzho . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47889][FOLLOWUP] Add `gradlew` to `.licenserc.yaml` [spark-kubernetes-operator]
dongjoon-hyun commented on PR #5: URL: https://github.com/apache/spark-kubernetes-operator/pull/5#issuecomment-2064475304 Let me merge this to recover the CIs. cc @viirya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
minyyy commented on code in PR #46114: URL: https://github.com/apache/spark/pull/46114#discussion_r1571064621 ## sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala: ## @@ -108,12 +108,24 @@ class ExpressionSet protected( newSet } + override def ++(elems: GenTraversableOnce[Expression]): ExpressionSet = { Review Comment: Could you add a comment on the method about why we don't use the `SetLike.default` here? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47889][FOLLOWUP] Add `gradlew` to .licenserc.yaml [spark-kubernetes-operator]
dongjoon-hyun opened a new pull request, #5: URL: https://github.com/apache/spark-kubernetes-operator/pull/5 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
ericm-db commented on code in PR #45991: URL: https://github.com/apache/spark/pull/45991#discussion_r1571061801 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.{MapState, TTLConfig} +import org.apache.spark.util.NextIterator + + +class MapStateImplWithTTL[K, V]( + store: StateStore, + stateName: String, + keyExprEnc: ExpressionEncoder[Any], + userKeyEnc: Encoder[K], + valEncoder: Encoder[V], + ttlConfig: TTLConfig, + batchTimestampMs: Long) extends CompositeKeyTTLStateImpl(stateName, store, batchTimestampMs) + with MapState[K, V] with Logging { + + private val keySerializer = keyExprEnc.createSerializer() + private val stateTypesEncoder = new CompositeKeyStateEncoder( +keySerializer, userKeyEnc, valEncoder, COMPOSITE_KEY_ROW_SCHEMA, stateName, hasTtl = true) + + private val ttlExpirationMs = +StateTTL.calculateExpirationTimeForDuration(ttlConfig.ttlDuration, batchTimestampMs) + + initialize() + + private def initialize(): Unit = { +store.createColFamilyIfAbsent(stateName, COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL, + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1)) + } + + /** Whether state exists or not. */ + override def exists(): Boolean = { +iterator().nonEmpty + } + + /** Get the state value if it exists */ + override def getValue(key: K): V = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key) +val retRow = store.get(encodedCompositeKey, stateName) + +if (retRow != null) { + val resState = stateTypesEncoder.decodeValue(retRow) + + if (!stateTypesEncoder.isExpired(retRow, batchTimestampMs)) { +resState + } else { +null.asInstanceOf[V] + } +} else { + null.asInstanceOf[V] +} + } + + /** Check if the user key is contained in the map */ + override def containsKey(key: K): Boolean = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +getValue(key) != null + } + + /** Update value for given user key */ + override def updateValue(key: K, value: V): Unit = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +StateStoreErrors.requireNonNullStateValue(value, stateName) +val encodedValue = stateTypesEncoder.encodeValue(value, ttlExpirationMs) +val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key) +store.put(encodedCompositeKey, encodedValue, stateName) +val serializedGroupingKey = stateTypesEncoder.serializeGroupingKey() +val serializedUserKey = stateTypesEncoder.serializeUserKey(key) +upsertTTLForStateKey(ttlExpirationMs, serializedGroupingKey, serializedUserKey) + } + + /** Get the map associated with grouping key */ + override def iterator(): Iterator[(K, V)] = { +val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey() +val unsafeRowPairIterator = store.prefixScan(encodedGroupingKey, stateName) +new NextIterator[(K, V)] { + override protected def getNext(): (K, V) = { +val iter = unsafeRowPairIterator.dropWhile { rowPair => + stateTypesEncoder.isExpired(rowPair.value, batchTimestampMs) +} +if (iter.hasNext) { + val currentRowPair = iter.next() + val key = stateTypesEncoder.decodeCompositeKey(currentRowPair.key) + val value = stateTypesEncoder.decodeValue(currentRowPair.value) + (key, value) +} else { + finished = true + null.asInstanceOf[(K, V)] +} + } + + override protected def close(): Unit = {} +} + } + +
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
ericm-db commented on code in PR #45991: URL: https://github.com/apache/spark/pull/45991#discussion_r1571061801 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImplWithTTL.scala: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.{MapState, TTLConfig} +import org.apache.spark.util.NextIterator + + +class MapStateImplWithTTL[K, V]( + store: StateStore, + stateName: String, + keyExprEnc: ExpressionEncoder[Any], + userKeyEnc: Encoder[K], + valEncoder: Encoder[V], + ttlConfig: TTLConfig, + batchTimestampMs: Long) extends CompositeKeyTTLStateImpl(stateName, store, batchTimestampMs) + with MapState[K, V] with Logging { + + private val keySerializer = keyExprEnc.createSerializer() + private val stateTypesEncoder = new CompositeKeyStateEncoder( +keySerializer, userKeyEnc, valEncoder, COMPOSITE_KEY_ROW_SCHEMA, stateName, hasTtl = true) + + private val ttlExpirationMs = +StateTTL.calculateExpirationTimeForDuration(ttlConfig.ttlDuration, batchTimestampMs) + + initialize() + + private def initialize(): Unit = { +store.createColFamilyIfAbsent(stateName, COMPOSITE_KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL, + PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1)) + } + + /** Whether state exists or not. */ + override def exists(): Boolean = { +iterator().nonEmpty + } + + /** Get the state value if it exists */ + override def getValue(key: K): V = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key) +val retRow = store.get(encodedCompositeKey, stateName) + +if (retRow != null) { + val resState = stateTypesEncoder.decodeValue(retRow) + + if (!stateTypesEncoder.isExpired(retRow, batchTimestampMs)) { +resState + } else { +null.asInstanceOf[V] + } +} else { + null.asInstanceOf[V] +} + } + + /** Check if the user key is contained in the map */ + override def containsKey(key: K): Boolean = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +getValue(key) != null + } + + /** Update value for given user key */ + override def updateValue(key: K, value: V): Unit = { +StateStoreErrors.requireNonNullStateValue(key, stateName) +StateStoreErrors.requireNonNullStateValue(value, stateName) +val encodedValue = stateTypesEncoder.encodeValue(value, ttlExpirationMs) +val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key) +store.put(encodedCompositeKey, encodedValue, stateName) +val serializedGroupingKey = stateTypesEncoder.serializeGroupingKey() +val serializedUserKey = stateTypesEncoder.serializeUserKey(key) +upsertTTLForStateKey(ttlExpirationMs, serializedGroupingKey, serializedUserKey) + } + + /** Get the map associated with grouping key */ + override def iterator(): Iterator[(K, V)] = { +val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey() +val unsafeRowPairIterator = store.prefixScan(encodedGroupingKey, stateName) +new NextIterator[(K, V)] { + override protected def getNext(): (K, V) = { +val iter = unsafeRowPairIterator.dropWhile { rowPair => + stateTypesEncoder.isExpired(rowPair.value, batchTimestampMs) +} +if (iter.hasNext) { + val currentRowPair = iter.next() + val key = stateTypesEncoder.decodeCompositeKey(currentRowPair.key) + val value = stateTypesEncoder.decodeValue(currentRowPair.value) + (key, value) +} else { + finished = true + null.asInstanceOf[(K, V)] +} + } + + override protected def close(): Unit = {} +} + } + +
[PR] [AUTO][SC-162831][SPARK-47371][SQL][FOLLOWUP] XML: Stop ignoring CDATA within row tags [spark]
yhosny opened a new pull request, #46121: URL: https://github.com/apache/spark/pull/46121 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47889] Setup gradle as build tool for operator repository [spark-kubernetes-operator]
dongjoon-hyun commented on PR #4: URL: https://github.com/apache/spark-kubernetes-operator/pull/4#issuecomment-2064461743 Oh, my bad. I didn't check the PR builder on this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]
ericm-db commented on code in PR #45991: URL: https://github.com/apache/spark/pull/45991#discussion_r1571060613 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.time.Duration + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.execution.streaming.{MapStateImplWithTTL, MemoryStream} +import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.util.StreamManualClock + +class MapStateSingleKeyTTLProcessor(ttlConfig: TTLConfig) + extends StatefulProcessor[String, InputEvent, OutputEvent] +with Logging { + + @transient private var _mapState: MapStateImplWithTTL[String, Int] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { +_mapState = getHandle + .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig) + .asInstanceOf[MapStateImplWithTTL[String, Int]] + } + override def handleInputRows( +key: String, +inputRows: Iterator[InputEvent], +timerValues: TimerValues, +expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputEvent] = { +var results = List[OutputEvent]() + +for (row <- inputRows) { + val resultIter = processRow(row, _mapState) + resultIter.foreach { r => +results = r :: results + } +} + +results.iterator + } + + def processRow( + row: InputEvent, + mapState: MapStateImplWithTTL[String, Int]): Iterator[OutputEvent] = { +var results = List[OutputEvent]() +val key = row.key +val userKey = "key" +if (row.action == "get") { + if (mapState.containsKey(userKey)) { +results = OutputEvent(key, mapState.getValue(userKey), isTTLValue = false, -1) :: results + } +} else if (row.action == "get_without_enforcing_ttl") { + val currState = mapState.getWithoutEnforcingTTL(userKey) + if (currState.isDefined) { +results = OutputEvent(key, currState.get, isTTLValue = false, -1) :: results + } +} else if (row.action == "get_ttl_value_from_state") { + val ttlValue = mapState.getTTLValue(userKey) + if (ttlValue.isDefined) { +val value = ttlValue.get._1 +val ttlExpiration = ttlValue.get._2 +results = OutputEvent(key, value, isTTLValue = true, ttlExpiration) :: results + } +} else if (row.action == "put") { + mapState.updateValue(userKey, row.value) +} else if (row.action == "get_values_in_ttl_state") { + val ttlValues = mapState.getValuesInTTLState() + ttlValues.foreach { v => +results = OutputEvent(key, -1, isTTLValue = true, ttlValue = v) :: results + } +} + +results.iterator + } +} + +case class MapInputEvent( +key: String, +userKey: String, +action: String, +value: Int) + +case class MapOutputEvent( +key: String, +userKey: String, +value: Int, +isTTLValue: Boolean, +ttlValue: Long) + +class MapStateTTLProcessor(ttlConfig: TTLConfig) + extends StatefulProcessor[String, MapInputEvent, MapOutputEvent] +with Logging { + + @transient private var _mapState: MapStateImplWithTTL[String, Int] = _ + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { +_mapState = getHandle + .getMapState("mapState", Encoders.STRING, Encoders.scalaInt, ttlConfig) + .asInstanceOf[MapStateImplWithTTL[String, Int]] + } + + override def handleInputRows( + key: String, + inputRows: Iterator[MapInputEvent], + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[MapOutputEvent] = { +var results = List[MapOutputEvent]() + +for (row <- inputRows) { + val resultIter = processRow(row, _mapState) + resultIter.foreach { r => +results = r :: results + } +} + +results.iterator + } + + def processRow( + row: MapInputEvent, + mapState: MapStateImplWithTTL[String, Int]):
Re: [PR] [SPARK-47889] Setup gradle as build tool for operator repository [spark-kubernetes-operator]
dongjoon-hyun closed pull request #4: [SPARK-47889] Setup gradle as build tool for operator repository URL: https://github.com/apache/spark-kubernetes-operator/pull/4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]
uros-db commented on code in PR #46041: URL: https://github.com/apache/spark/pull/46041#discussion_r1571038691 ## sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala: ## @@ -425,6 +421,74 @@ class CollationStringExpressionsSuite }) } + test("Support StringRPad string expressions with collation") { +// Supported collations +case class StringRPadTestCase[R](s: String, len: Int, pad: String, c: String, result: R) +val testCases = Seq( + StringRPadTestCase("", 5, " ", "UTF8_BINARY", " "), + StringRPadTestCase("abc", 5, " ", "UNICODE", "abc "), + StringRPadTestCase("Hello", 7, "Wörld", "UTF8_BINARY_LCASE", "HelloWö"), // scalastyle:ignore + StringRPadTestCase("1234567890", 5, "aaaAAa", "UNICODE_CI", "12345"), + StringRPadTestCase("aaAA", 2, " ", "UTF8_BINARY", "aa"), + StringRPadTestCase("ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ℀℃", 2, "1", "UNICODE", "ÀÃ"), // scalastyle:ignore + StringRPadTestCase("ĂȦÄäåäá", 20, "ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ", "UTF8_BINARY_LCASE", "ĂȦÄäåäáÀÃÂĀĂȦÄäåäáâã"), // scalastyle:ignore + StringRPadTestCase("aȦÄä", 8, "a1", "UNICODE_CI", "aȦÄäa1a1") // scalastyle:ignore Review Comment: don't escape scalastyle guides, especially not for line length it's completely alright to break a line in 2, or shorten `ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ` to something shorter like `ÃäĂåȻǢÆ` in some test cases ## sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala: ## @@ -425,6 +421,74 @@ class CollationStringExpressionsSuite }) } + test("Support StringRPad string expressions with collation") { +// Supported collations +case class StringRPadTestCase[R](s: String, len: Int, pad: String, c: String, result: R) +val testCases = Seq( + StringRPadTestCase("", 5, " ", "UTF8_BINARY", " "), + StringRPadTestCase("abc", 5, " ", "UNICODE", "abc "), + StringRPadTestCase("Hello", 7, "Wörld", "UTF8_BINARY_LCASE", "HelloWö"), // scalastyle:ignore + StringRPadTestCase("1234567890", 5, "aaaAAa", "UNICODE_CI", "12345"), + StringRPadTestCase("aaAA", 2, " ", "UTF8_BINARY", "aa"), + StringRPadTestCase("ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ℀℃", 2, "1", "UNICODE", "ÀÃ"), // scalastyle:ignore + StringRPadTestCase("ĂȦÄäåäá", 20, "ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ", "UTF8_BINARY_LCASE", "ĂȦÄäåäáÀÃÂĀĂȦÄäåäáâã"), // scalastyle:ignore + StringRPadTestCase("aȦÄä", 8, "a1", "UNICODE_CI", "aȦÄäa1a1") // scalastyle:ignore +) +testCases.foreach(t => { + val query = s"SELECT rpad(collate('${t.s}', '${t.c}')," + +s" ${t.len}, collate('${t.pad}', '${t.c}'))" + // Result & data type + checkAnswer(sql(query), Row(t.result)) + assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.c))) + // Implicit casting + checkAnswer( +sql(s"SELECT rpad(collate('${t.s}', '${t.c}'), ${t.len}, '${t.pad}')"), +Row(t.result)) + checkAnswer( +sql(s"SELECT rpad('${t.s}', ${t.len}, collate('${t.pad}', '${t.c}'))"), +Row(t.result)) +}) +// Collation mismatch +val collationMismatch = intercept[AnalysisException] { + sql("SELECT rpad(collate('abcde', 'UNICODE_CI'),1,collate('C', 'UTF8_BINARY_LCASE'))") +} +assert(collationMismatch.getErrorClass === "COLLATION_MISMATCH.EXPLICIT") + } + + test("Support StringLPad string expressions with collation") { +// Supported collations +case class StringLPadTestCase[R](s: String, len: Int, pad: String, c: String, result: R) +val testCases = Seq( + StringLPadTestCase("", 5, " ", "UTF8_BINARY", " "), + StringLPadTestCase("abc", 5, " ", "UNICODE", " abc"), + StringLPadTestCase("Hello", 7, "Wörld", "UTF8_BINARY_LCASE", "WöHello"), // scalastyle:ignore + StringLPadTestCase("1234567890", 5, "aaaAAa", "UNICODE_CI", "12345"), + StringLPadTestCase("aaAA", 2, " ", "UTF8_BINARY", "aa"), + StringLPadTestCase("ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ℀℃", 2, "1", "UNICODE", "ÀÃ"), // scalastyle:ignore + StringLPadTestCase("ĂȦÄäåäá", 20, "ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ", "UTF8_BINARY_LCASE", "ÀÃÂĀĂȦÄäåäáâãĂȦÄäåäá"), // scalastyle:ignore + StringLPadTestCase("aȦÄä", 8, "a1", "UNICODE_CI", "a1a1aȦÄä") // scalastyle:ignore Review Comment: ditto (see above) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]
uros-db commented on code in PR #46041: URL: https://github.com/apache/spark/pull/46041#discussion_r1571035111 ## sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala: ## @@ -323,10 +323,6 @@ class CollationStringExpressionsSuite |select overlay(collate('${t.l}', '${t.c}') placing |collate('${t.r}', '${t.c}') from ${t.pos}) |""".stripMargin - // Result & data type - checkAnswer(sql(query), Row(t.result)) - assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.c))) - // Implicit casting Review Comment: don't make these changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47413][SQL] - add support to substr/left/right for collations [spark]
uros-db commented on code in PR #46040: URL: https://github.com/apache/spark/pull/46040#discussion_r1571028944 ## sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala: ## @@ -425,6 +425,54 @@ class CollationStringExpressionsSuite }) } + test("Support Left/Right/Substr with collation") { +case class SubstringTestCase( +method: String, +str: String, +len: String, +pad: Option[String], +collation: String, +result: Row) { + val strString = if (str == "null") "null" else s"'$str'" + val query = +s"SELECT $method(collate($strString, '$collation'), $len${pad.map(p => s", '$p'").getOrElse("")})" // scalastyle:ignore line.size.limit Review Comment: it's alright to break a row we shouldn't normally use // scalastyle:ignore line.size.limit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47413][SQL] - add support to substr/left/right for collations [spark]
uros-db commented on code in PR #46040: URL: https://github.com/apache/spark/pull/46040#discussion_r1571030478 ## sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala: ## @@ -425,6 +425,54 @@ class CollationStringExpressionsSuite }) } + test("Support Left/Right/Substr with collation") { +case class SubstringTestCase( +method: String, +str: String, +len: String, +pad: Option[String], +collation: String, +result: Row) { + val strString = if (str == "null") "null" else s"'$str'" + val query = +s"SELECT $method(collate($strString, '$collation'), $len${pad.map(p => s", '$p'").getOrElse("")})" // scalastyle:ignore line.size.limit +} + +val checks = Seq( + SubstringTestCase("substr", "example", "1", Some("100"), "utf8_binary_lcase", Row("example")), + SubstringTestCase("substr", "example", "2", Some("2"), "utf8_binary", Row("xa")), + SubstringTestCase("right", "", "1", None, "utf8_binary_lcase", Row("")), + SubstringTestCase("substr", "example", "0", Some("0"), "unicode", Row("")), + SubstringTestCase("substr", "example", "-3", Some("2"), "unicode_ci", Row("pl")), + SubstringTestCase("substr", " a世a ", "2", Some("3"), "utf8_binary_lcase", Row("a世a")), + SubstringTestCase("left", " a世a ", "3", None, "utf8_binary", Row(" a世")), + SubstringTestCase("right", " a世a ", "3", None, "unicode", Row("世a ")), + SubstringTestCase("left", "ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ", "3", None, "unicode_ci", Row("ÀÃÂ")), + SubstringTestCase("right", "ÀÃÂĀĂȦÄäåäáâãȻǢǼÆ", "3", None, "utf8_binary_lcase", Row("ǢǼÆ")), // scalastyle:ignore line.size.limit Review Comment: ditto here just shorten the string a bit and it should fit into 100 chars :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47413][SQL] - add support to substr/left/right for collations [spark]
uros-db commented on code in PR #46040: URL: https://github.com/apache/spark/pull/46040#discussion_r1571028944 ## sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala: ## @@ -425,6 +425,54 @@ class CollationStringExpressionsSuite }) } + test("Support Left/Right/Substr with collation") { +case class SubstringTestCase( +method: String, +str: String, +len: String, +pad: Option[String], +collation: String, +result: Row) { + val strString = if (str == "null") "null" else s"'$str'" + val query = +s"SELECT $method(collate($strString, '$collation'), $len${pad.map(p => s", '$p'").getOrElse("")})" // scalastyle:ignore line.size.limit Review Comment: it's already to break a row we shouldn't normally use // scalastyle:ignore line.size.limit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47902][SQL]Making Compute Current Time* expressions foldable [spark]
dbatomic opened a new pull request, #46120: URL: https://github.com/apache/spark/pull/46120 ### What changes were proposed in this pull request? This PR is a followed of [this](https://github.com/apache/spark/pull/44261) PR that made "compute current time" family of expressions `Unevaluable`. Implicitly, all `Unevaluable` expressions are also not foldable. Hence, we have a regression comparing to state prior to 44261 where "compute current time" expressions could have been used in places where constant folding is required. Proposed change is to keep these expression `Unevaluable` in a sense that `eval`/`codeGen` can't be called but to allow folding. This is a special case given that these expressions are supposed to be replaced by QO with literals (that are foldable) so proposal is to create new interface for this family of expressions. ### Why are the changes needed? Explained above. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Additional test is added that ensures that `CurrentDate` can be used in places that require folding. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DRAFT][SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
uros-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1570971222 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationExpressionSuite.scala: ## @@ -161,4 +162,40 @@ class CollationExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(ArrayExcept(left, right), out) } } + + test("MultiLikeBase regexp expressions with collated strings") { Review Comment: with StringExpressions, we had a situation where interpreted & codegen codepaths defaulted to a single function call, so unit tests in CollationSupportSuite were enough to verify everything goes well there however here we have to make traditional unit tests in order to verify the behaviour for both interpreted (nullSafeEval) & codegen (doGenCode) execution (sql tests don't seem to cover both) this PR will only be correct once I add unit tests for all other regexp expressions too, and I figured CollationExpressionSuite is the correct place to do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DRAFT][SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
uros-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1570967566 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java: ## @@ -143,7 +145,24 @@ public static boolean execICU(final UTF8String l, final UTF8String r, * Collation-aware regexp expressions. */ - // TODO: Add more collation-aware regexp expressions. + public static boolean supportsLowercaseRegex(final int collationId) { Review Comment: there is no behaviour to test in CollationSupportSuite, all collation awareness here is embedded directly in regexpExpressions instead this one is gonna have to be a bit different than StringExpressions (in terms of testing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DRAFT][SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]
nikolamand-db commented on code in PR #46077: URL: https://github.com/apache/spark/pull/46077#discussion_r1570959180 ## sql/core/src/test/scala/org/apache/spark/sql/CollationRegexpExpressionsSuite.scala: ## @@ -34,288 +34,380 @@ class CollationRegexpExpressionsSuite // Supported collations case class LikeTestCase[R](l: String, r: String, c: String, result: R) val testCases = Seq( - LikeTestCase("ABC", "%B%", "UTF8_BINARY", true) + LikeTestCase("ABC", "%B%", "UTF8_BINARY", true), + LikeTestCase("AḂC", "%ḃ%", "UTF8_BINARY_LCASE", true), // scalastyle:ignore + LikeTestCase("ABC", "%b%", "UNICODE", false) ) testCases.foreach(t => { - val query = s"SELECT like(collate('${t.l}', '${t.c}'), collate('${t.r}', '${t.c}'))" + val query = s"SELECT like(collate('${t.l}', '${t.c}'), '${t.r}')" // Result & data type checkAnswer(sql(query), Row(t.result)) assert(sql(query).schema.fields.head.dataType.sameType(BooleanType)) - // TODO: Implicit casting (not currently supported) }) // Unsupported collations case class LikeTestFail(l: String, r: String, c: String) val failCases = Seq( - LikeTestFail("ABC", "%b%", "UTF8_BINARY_LCASE"), - LikeTestFail("ABC", "%B%", "UNICODE"), LikeTestFail("ABC", "%b%", "UNICODE_CI") ) failCases.foreach(t => { - val query = s"SELECT like(collate('${t.l}', '${t.c}'), collate('${t.r}', '${t.c}'))" + val query = s"SELECT like(collate('${t.l}', '${t.c}'), '${t.r}')" val unsupportedCollation = intercept[AnalysisException] { sql(query) } assert(unsupportedCollation.getErrorClass === "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE") }) -// TODO: Collation mismatch (not currently supported) } test("Support ILike string expression with collation") { // Supported collations case class ILikeTestCase[R](l: String, r: String, c: String, result: R) val testCases = Seq( - ILikeTestCase("ABC", "%b%", "UTF8_BINARY", true) + ILikeTestCase("ABC", "%b%", "UTF8_BINARY", true), + ILikeTestCase("AḂC", "%ḃ%", "UTF8_BINARY_LCASE", true), // scalastyle:ignore Review Comment: Let's wrap the entire class with ascii check ignore such as in https://github.com/apache/spark/blob/6232085227ee2cc4e831996a1ac84c27868a1595/sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala#L27 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java: ## @@ -143,7 +145,24 @@ public static boolean execICU(final UTF8String l, final UTF8String r, * Collation-aware regexp expressions. */ - // TODO: Add more collation-aware regexp expressions. + public static boolean supportsLowercaseRegex(final int collationId) { Review Comment: Should we add thorough unit tests for these functions in `CollationSupportSuite`? ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationExpressionSuite.scala: ## @@ -161,4 +162,40 @@ class CollationExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(ArrayExcept(left, right), out) } } + + test("MultiLikeBase regexp expressions with collated strings") { Review Comment: Why do we need this test if we already have checks for `LikeAll`, `LikeAny`, etc. in `CollationRegexpExpressionsSuite`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute, AttributeReference and Alias don't quote qualified names properly. [spark]
IgorBerman commented on PR #31754: URL: https://github.com/apache/spark/pull/31754#issuecomment-2064149586 this change might create confusion when looking at TreeNode.sql() currently columns are not always have backticks i.e. instead of `column`.`nestedColumn` we will have column.`nestedColumn` so currently there is no difference between this and tableName.`nestedColumn` i.e. not clear if column is qualifier or column I'm not sure what ANSI says regarding this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47887][CONNECT] Remove unused import `spark/connect/common.proto` from `spark/connect/relations.proto` [spark]
dongjoon-hyun closed pull request #46106: [SPARK-47887][CONNECT] Remove unused import `spark/connect/common.proto` from `spark/connect/relations.proto` URL: https://github.com/apache/spark/pull/46106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47893][BUILD] Upgrade ASM to 9.7 [spark]
dongjoon-hyun closed pull request #46110: [SPARK-47893][BUILD] Upgrade ASM to 9.7 URL: https://github.com/apache/spark/pull/46110 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47897][SQL][3.5] Fix ExpressionSet performance regression in scala 2.12 [spark]
dongjoon-hyun commented on PR #46114: URL: https://github.com/apache/spark/pull/46114#issuecomment-2064075288 cc @LuciferYang , @viirya , too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47463][SQL][3.5] Use V2Predicate to wrap expression with return type of boolean [spark]
cloud-fan closed pull request #46074: [SPARK-47463][SQL][3.5] Use V2Predicate to wrap expression with return type of boolean URL: https://github.com/apache/spark/pull/46074 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org