[PR] [WIP] test java toUpperCase & toLowerCase [spark]

2024-04-19 Thread via GitHub


panbingkun opened a new pull request, #46147:
URL: https://github.com/apache/spark/pull/46147

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]

2024-04-19 Thread via GitHub


CTCC1 commented on code in PR #46045:
URL: https://github.com/apache/spark/pull/46045#discussion_r1573145872


##
python/pyspark/sql/connect/functions/builtin.py:
##
@@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", 
int]) -> Column:
 repeat.__doc__ = pysparkfuncs.repeat.__doc__
 
 
-def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column:
-return _invoke_function("split", _to_col(str), lit(pattern), lit(limit))
+def split(
+str: "ColumnOrName",
+pattern: Union[Column, str],
+limit: Union["ColumnOrName", int] = -1,
+) -> Column:
+# work around shadowing of str in the input variable name
+from builtins import str as py_str
+
+if isinstance(pattern, py_str):
+_pattern = lit(pattern)
+elif isinstance(pattern, Column):
+_pattern = pattern
+else:
+raise PySparkTypeError(
+error_class="NOT_COLUMN_OR_STR",
+message_parameters={"arg_name": "pattern", "arg_type": 
type(pattern).__name__},
+)
+
+limit = lit(limit) if isinstance(limit, int) else _to_col(limit)
+return _invoke_function("split", _to_col(str), _pattern, limit)

Review Comment:
   Sure. I removed the type check now. Maybe in the future we can standardize 
this, e.g. with a decorator that inspect the function signature and do type 
check accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]

2024-04-19 Thread via GitHub


liucao-dd commented on code in PR #46045:
URL: https://github.com/apache/spark/pull/46045#discussion_r1573144905


##
python/pyspark/sql/connect/functions/builtin.py:
##
@@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", 
int]) -> Column:
 repeat.__doc__ = pysparkfuncs.repeat.__doc__
 
 
-def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column:
-return _invoke_function("split", _to_col(str), lit(pattern), lit(limit))
+def split(
+str: "ColumnOrName",
+pattern: Union[Column, str],
+limit: Union["ColumnOrName", int] = -1,
+) -> Column:
+# work around shadowing of str in the input variable name
+from builtins import str as py_str
+
+if isinstance(pattern, py_str):
+_pattern = lit(pattern)
+elif isinstance(pattern, Column):
+_pattern = pattern
+else:
+raise PySparkTypeError(
+error_class="NOT_COLUMN_OR_STR",
+message_parameters={"arg_name": "pattern", "arg_type": 
type(pattern).__name__},
+)
+
+limit = lit(limit) if isinstance(limit, int) else _to_col(limit)
+return _invoke_function("split", _to_col(str), _pattern, limit)

Review Comment:
   Sure. I removed the type check now. Maybe in the future we can standard 
this, e.g. with a decorator that inspect the function signature and do type 
check accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python [spark]

2024-04-19 Thread via GitHub


liucao-dd commented on code in PR #46045:
URL: https://github.com/apache/spark/pull/46045#discussion_r1573144905


##
python/pyspark/sql/connect/functions/builtin.py:
##
@@ -2476,8 +2476,26 @@ def repeat(col: "ColumnOrName", n: Union["ColumnOrName", 
int]) -> Column:
 repeat.__doc__ = pysparkfuncs.repeat.__doc__
 
 
-def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column:
-return _invoke_function("split", _to_col(str), lit(pattern), lit(limit))
+def split(
+str: "ColumnOrName",
+pattern: Union[Column, str],
+limit: Union["ColumnOrName", int] = -1,
+) -> Column:
+# work around shadowing of str in the input variable name
+from builtins import str as py_str
+
+if isinstance(pattern, py_str):
+_pattern = lit(pattern)
+elif isinstance(pattern, Column):
+_pattern = pattern
+else:
+raise PySparkTypeError(
+error_class="NOT_COLUMN_OR_STR",
+message_parameters={"arg_name": "pattern", "arg_type": 
type(pattern).__name__},
+)
+
+limit = lit(limit) if isinstance(limit, int) else _to_col(limit)
+return _invoke_function("split", _to_col(str), _pattern, limit)

Review Comment:
   Sure. I removed the type check now. Maybe in the future we can standard 
this, e.g. with a decorator that inspect the function signature and do type 
check accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #46145: [SPARK-47925][SQL][TESTS] Mark 
`BloomFilterAggregateQuerySuite` as `ExtendedSQLTest`
URL: https://github.com/apache/spark/pull/46145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on PR #46145:
URL: https://github.com/apache/spark/pull/46145#issuecomment-2067526908

   Thank you, @HyukjinKwon . Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #46144: [SPARK-47924][CORE] Add a DEBUG log 
to `DiskStore.moveFileToBlock`
URL: https://github.com/apache/spark/pull/46144


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #46142: [SPARK-47923][R] Upgrade the minimum 
version of `arrow` R package to 10.0.0
URL: https://github.com/apache/spark/pull/46142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on PR #46144:
URL: https://github.com/apache/spark/pull/46144#issuecomment-2067525455

   Thank you, @HyukjinKwon . Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on PR #46142:
URL: https://github.com/apache/spark/pull/46142#issuecomment-2067525074

   Thank you, @HyukjinKwon ! Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47907] Put bang under a config [spark]

2024-04-19 Thread via GitHub


srielau commented on PR #46138:
URL: https://github.com/apache/spark/pull/46138#issuecomment-2067511478

   @cloud-fan @gengliangwang This is ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573100752


##
python/pyspark/sql/worker/plan_data_source_read.py:
##
@@ -51,6 +52,71 @@
 )
 
 
+def records_to_arrow_batches(
+output_iter: Iterator[Tuple],
+max_arrow_batch_size: int,
+return_type: StructType,
+data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:

Review Comment:
   docstring added.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573100690


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):

Review Comment:
   Fixed.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 

Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]

2024-04-19 Thread via GitHub


erenavsarogullari commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1573096494


##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+val joinedDF = createJoinedDF()
+
+val error = intercept[SparkException] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "ProblematicCoalesce execution is 
failed")
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// All QueryStages should be based on ShuffleQueryStageExec
+val shuffleQueryStageExecs = collect(adaptivePlan) {
+  case sqse: ShuffleQueryStageExec => sqse
+}
+assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should 
include " +
+  s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
+shuffleQueryStageExecs.foreach(sqse => 
assert(sqse.name.contains("ShuffleQueryStageExec-")))
+// First ShuffleQueryStage is materialized so it needs to be canceled.
+assert(shuffleQueryStageExecs(0).isMaterializationStarted(),
+  "Materialization should be started.")
+// Second ShuffleQueryStage materialization is failed so
+// it is excluded from the cancellation due to earlyFailedStage.
+assert(shuffleQueryStageExecs(1).isMaterializationStarted(),
+  "Materialization should be started but it is failed.")
+// Last ShuffleQueryStage is not materialized yet so it does not 
require
+// to be canceled and it is just skipped from the cancellation.
+assert(!shuffleQueryStageExecs(2).isMaterializationStarted(),
+  "Materialization should not be started.")
+  }
+} finally {
+  spark.experimental.extraStrategies = Nil
+}
+  }
+
+  test("SPARK-47148: Check if BroadcastQueryStage materialization is started") 
{
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
+val joinedDF = createJoinedDF()
+
+val error = intercept[SparkException] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "ProblematicCoalesce execution is 
failed")
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// All QueryStages should be based on BroadcastQueryStageExec
+val broadcastQueryStageExecs = collect(adaptivePlan) {
+  case bqse: BroadcastQueryStageExec => bqse
+}
+assert(broadcastQueryStageExecs.length == 2, adaptivePlan)
+broadcastQueryStageExecs.foreach { bqse =>
+  assert(bqse.name.contains("BroadcastQueryStageExec-"))
+  // Both BroadcastQueryStages are materialized at the beginning.

Review Comment:
   I have added `BROADCAST` hint, however, second BroadcastQueryStage 
materialization still seems to be kicked off. Please see last commit.



##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+val joinedDF = createJoinedDF()
+
+val error = intercept[SparkException] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "ProblematicCoalesce execution is 
failed")
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// All QueryStages should be based on ShuffleQueryStageExec
+val shuffleQueryStageExecs = collect(adaptivePlan) {
+  case sqse: ShuffleQueryStageExec => sqse
+}
+assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should 
include " +
+  s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
+shuffleQueryStageExecs.foreach(sqse => 
assert(sqse.name.contains("ShuffleQueryStageExec-")))
+// First 

Re: [PR] [WIP] Only test rocksdbjni 9.x [spark]

2024-04-19 Thread via GitHub


panbingkun commented on PR #46146:
URL: https://github.com/apache/spark/pull/46146#issuecomment-2067487411

   At present, we are only testing `rocksdbjni's` `9 series` in advance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] Only test rocksdbjni 9.x [spark]

2024-04-19 Thread via GitHub


panbingkun opened a new pull request, #46146:
URL: https://github.com/apache/spark/pull/46146

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]

2024-04-19 Thread via GitHub


erenavsarogullari commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1573096494


##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+val joinedDF = createJoinedDF()
+
+val error = intercept[SparkException] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "ProblematicCoalesce execution is 
failed")
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// All QueryStages should be based on ShuffleQueryStageExec
+val shuffleQueryStageExecs = collect(adaptivePlan) {
+  case sqse: ShuffleQueryStageExec => sqse
+}
+assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should 
include " +
+  s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
+shuffleQueryStageExecs.foreach(sqse => 
assert(sqse.name.contains("ShuffleQueryStageExec-")))
+// First ShuffleQueryStage is materialized so it needs to be canceled.
+assert(shuffleQueryStageExecs(0).isMaterializationStarted(),
+  "Materialization should be started.")
+// Second ShuffleQueryStage materialization is failed so
+// it is excluded from the cancellation due to earlyFailedStage.
+assert(shuffleQueryStageExecs(1).isMaterializationStarted(),
+  "Materialization should be started but it is failed.")
+// Last ShuffleQueryStage is not materialized yet so it does not 
require
+// to be canceled and it is just skipped from the cancellation.
+assert(!shuffleQueryStageExecs(2).isMaterializationStarted(),
+  "Materialization should not be started.")
+  }
+} finally {
+  spark.experimental.extraStrategies = Nil
+}
+  }
+
+  test("SPARK-47148: Check if BroadcastQueryStage materialization is started") 
{
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
+val joinedDF = createJoinedDF()
+
+val error = intercept[SparkException] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "ProblematicCoalesce execution is 
failed")
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// All QueryStages should be based on BroadcastQueryStageExec
+val broadcastQueryStageExecs = collect(adaptivePlan) {
+  case bqse: BroadcastQueryStageExec => bqse
+}
+assert(broadcastQueryStageExecs.length == 2, adaptivePlan)
+broadcastQueryStageExecs.foreach { bqse =>
+  assert(bqse.name.contains("BroadcastQueryStageExec-"))
+  // Both BroadcastQueryStages are materialized at the beginning.

Review Comment:
   I have added `BROADCAST` hint, however, both BroadcastQueryStages 
materializations still seem to be kicked off. Please see last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]

2024-04-19 Thread via GitHub


erenavsarogullari commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1573095590


##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",

Review Comment:
   Addressed



##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +900,85 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+val joinedDF = createJoinedDF()
+
+val error = intercept[SparkException] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "ProblematicCoalesce execution is 
failed")
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// All QueryStages should be based on ShuffleQueryStageExec
+val shuffleQueryStageExecs = collect(adaptivePlan) {
+  case sqse: ShuffleQueryStageExec => sqse
+}
+assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should 
include " +
+  s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
+shuffleQueryStageExecs.foreach(sqse => 
assert(sqse.name.contains("ShuffleQueryStageExec-")))
+// First ShuffleQueryStage is materialized so it needs to be canceled.
+assert(shuffleQueryStageExecs(0).isMaterializationStarted(),
+  "Materialization should be started.")
+// Second ShuffleQueryStage materialization is failed so
+// it is excluded from the cancellation due to earlyFailedStage.
+assert(shuffleQueryStageExecs(1).isMaterializationStarted(),
+  "Materialization should be started but it is failed.")
+// Last ShuffleQueryStage is not materialized yet so it does not 
require
+// to be canceled and it is just skipped from the cancellation.
+assert(!shuffleQueryStageExecs(2).isMaterializationStarted(),
+  "Materialization should not be started.")
+  }
+} finally {
+  spark.experimental.extraStrategies = Nil
+}
+  }
+
+  test("SPARK-47148: Check if BroadcastQueryStage materialization is started") 
{
+try {
+  spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: 
Nil
+  withSQLConf(
+SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1573095371


##
python/pyspark/sql/classic/dataframe.py:
##
@@ -0,0 +1,1974 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import json
+import sys
+import random
+import warnings
+from collections.abc import Iterable
+from functools import reduce
+from typing import (
+Any,
+Callable,
+Dict,
+Iterator,
+List,
+Optional,
+Sequence,
+Tuple,
+Type,
+Union,
+cast,
+overload,
+TYPE_CHECKING,
+)
+
+from pyspark import _NoValue
+from pyspark.resource import ResourceProfile
+from pyspark._globals import _NoValueType
+from pyspark.errors import (
+PySparkTypeError,
+PySparkValueError,
+PySparkIndexError,
+PySparkAttributeError,
+)
+from pyspark.util import (
+is_remote_only,
+_load_from_socket,
+_local_iterator_from_socket,
+)
+from pyspark.serializers import BatchedSerializer, CPickleSerializer, 
UTF8Deserializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.traceback_utils import SCCallSiteSync
+from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
+from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
+from pyspark.sql.streaming import DataStreamWriter
+from pyspark.sql.types import (
+StructType,
+Row,
+_parse_datatype_json_string,
+)
+from pyspark.sql.dataframe import (
+DataFrame as ParentDataFrame,
+DataFrameNaFunctions as ParentDataFrameNaFunctions,
+DataFrameStatFunctions as ParentDataFrameStatFunctions,
+)
+from pyspark.sql.utils import get_active_spark_context, toJArray
+from pyspark.sql.pandas.conversion import PandasConversionMixin
+from pyspark.sql.pandas.map_ops import PandasMapOpsMixin
+
+if TYPE_CHECKING:
+from py4j.java_gateway import JavaObject
+from pyspark.core.rdd import RDD
+from pyspark.core.context import SparkContext
+from pyspark._typing import PrimitiveType
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
+from pyspark.sql._typing import (
+ColumnOrName,
+ColumnOrNameOrOrdinal,
+LiteralType,
+OptionalPrimitiveType,
+)
+from pyspark.sql.context import SQLContext
+from pyspark.sql.session import SparkSession
+from pyspark.sql.group import GroupedData
+from pyspark.sql.observation import Observation
+
+
+class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin):
+def __new__(
+cls,
+jdf: "JavaObject",
+sql_ctx: Union["SQLContext", "SparkSession"],
+) -> "DataFrame":
+self = object.__new__(cls)
+self.__init__(jdf, sql_ctx)  # type: ignore[misc]
+return self
+
+def __init__(
+self,
+jdf: "JavaObject",
+sql_ctx: Union["SQLContext", "SparkSession"],
+):
+from pyspark.sql.context import SQLContext
+
+self._sql_ctx: Optional["SQLContext"] = None
+
+if isinstance(sql_ctx, SQLContext):
+assert not os.environ.get("SPARK_TESTING")  # Sanity check for our 
internal usage.
+assert isinstance(sql_ctx, SQLContext)
+# We should remove this if-else branch in the future release, and 
rename
+# sql_ctx to session in the constructor. This is an internal code 
path but
+# was kept with a warning because it's used intensively by 
third-party libraries.
+warnings.warn("DataFrame constructor is internal. Do not directly 
use it.")
+self._sql_ctx = sql_ctx
+session = sql_ctx.sparkSession
+else:
+session = sql_ctx
+self._session: "SparkSession" = session
+
+self._sc: "SparkContext" = sql_ctx._sc
+self._jdf: "JavaObject" = jdf
+self.is_cached = False
+# initialized lazily
+self._schema: Optional[StructType] = None
+self._lazy_rdd: Optional["RDD[Row]"] = None
+# Check whether _repr_html is supported or not, we use it to avoid 
calling _jdf twice
+# by __repr__ and _repr_html_ while eager evaluation opens.
+self._support_repr_html = False
+
+@property
+def 

Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1573095413


##
python/pyspark/sql/classic/dataframe.py:
##
@@ -0,0 +1,1974 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import json
+import sys
+import random
+import warnings
+from collections.abc import Iterable
+from functools import reduce
+from typing import (
+Any,
+Callable,
+Dict,
+Iterator,
+List,
+Optional,
+Sequence,
+Tuple,
+Type,
+Union,
+cast,
+overload,
+TYPE_CHECKING,
+)
+
+from pyspark import _NoValue
+from pyspark.resource import ResourceProfile
+from pyspark._globals import _NoValueType
+from pyspark.errors import (
+PySparkTypeError,
+PySparkValueError,
+PySparkIndexError,
+PySparkAttributeError,
+)
+from pyspark.util import (
+is_remote_only,
+_load_from_socket,
+_local_iterator_from_socket,
+)
+from pyspark.serializers import BatchedSerializer, CPickleSerializer, 
UTF8Deserializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.traceback_utils import SCCallSiteSync
+from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
+from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
+from pyspark.sql.streaming import DataStreamWriter
+from pyspark.sql.types import (
+StructType,
+Row,
+_parse_datatype_json_string,
+)
+from pyspark.sql.dataframe import (
+DataFrame as ParentDataFrame,
+DataFrameNaFunctions as ParentDataFrameNaFunctions,
+DataFrameStatFunctions as ParentDataFrameStatFunctions,
+)
+from pyspark.sql.utils import get_active_spark_context, toJArray
+from pyspark.sql.pandas.conversion import PandasConversionMixin
+from pyspark.sql.pandas.map_ops import PandasMapOpsMixin
+
+if TYPE_CHECKING:
+from py4j.java_gateway import JavaObject
+from pyspark.core.rdd import RDD
+from pyspark.core.context import SparkContext
+from pyspark._typing import PrimitiveType
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
+from pyspark.sql._typing import (
+ColumnOrName,
+ColumnOrNameOrOrdinal,
+LiteralType,
+OptionalPrimitiveType,
+)
+from pyspark.sql.context import SQLContext
+from pyspark.sql.session import SparkSession
+from pyspark.sql.group import GroupedData
+from pyspark.sql.observation import Observation
+
+
+class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin):
+def __new__(
+cls,
+jdf: "JavaObject",
+sql_ctx: Union["SQLContext", "SparkSession"],
+) -> "DataFrame":
+self = object.__new__(cls)
+self.__init__(jdf, sql_ctx)  # type: ignore[misc]
+return self
+
+def __init__(
+self,
+jdf: "JavaObject",
+sql_ctx: Union["SQLContext", "SparkSession"],
+):
+from pyspark.sql.context import SQLContext
+
+self._sql_ctx: Optional["SQLContext"] = None
+
+if isinstance(sql_ctx, SQLContext):
+assert not os.environ.get("SPARK_TESTING")  # Sanity check for our 
internal usage.
+assert isinstance(sql_ctx, SQLContext)
+# We should remove this if-else branch in the future release, and 
rename
+# sql_ctx to session in the constructor. This is an internal code 
path but
+# was kept with a warning because it's used intensively by 
third-party libraries.
+warnings.warn("DataFrame constructor is internal. Do not directly 
use it.")
+self._sql_ctx = sql_ctx
+session = sql_ctx.sparkSession
+else:
+session = sql_ctx
+self._session: "SparkSession" = session
+
+self._sc: "SparkContext" = sql_ctx._sc
+self._jdf: "JavaObject" = jdf
+self.is_cached = False
+# initialized lazily
+self._schema: Optional[StructType] = None
+self._lazy_rdd: Optional["RDD[Row]"] = None
+# Check whether _repr_html is supported or not, we use it to avoid 
calling _jdf twice
+# by __repr__ and _repr_html_ while eager evaluation opens.
+self._support_repr_html = False
+
+@property
+def 

Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1573095371


##
python/pyspark/sql/classic/dataframe.py:
##
@@ -0,0 +1,1974 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import json
+import sys
+import random
+import warnings
+from collections.abc import Iterable
+from functools import reduce
+from typing import (
+Any,
+Callable,
+Dict,
+Iterator,
+List,
+Optional,
+Sequence,
+Tuple,
+Type,
+Union,
+cast,
+overload,
+TYPE_CHECKING,
+)
+
+from pyspark import _NoValue
+from pyspark.resource import ResourceProfile
+from pyspark._globals import _NoValueType
+from pyspark.errors import (
+PySparkTypeError,
+PySparkValueError,
+PySparkIndexError,
+PySparkAttributeError,
+)
+from pyspark.util import (
+is_remote_only,
+_load_from_socket,
+_local_iterator_from_socket,
+)
+from pyspark.serializers import BatchedSerializer, CPickleSerializer, 
UTF8Deserializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.traceback_utils import SCCallSiteSync
+from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
+from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
+from pyspark.sql.streaming import DataStreamWriter
+from pyspark.sql.types import (
+StructType,
+Row,
+_parse_datatype_json_string,
+)
+from pyspark.sql.dataframe import (
+DataFrame as ParentDataFrame,
+DataFrameNaFunctions as ParentDataFrameNaFunctions,
+DataFrameStatFunctions as ParentDataFrameStatFunctions,
+)
+from pyspark.sql.utils import get_active_spark_context, toJArray
+from pyspark.sql.pandas.conversion import PandasConversionMixin
+from pyspark.sql.pandas.map_ops import PandasMapOpsMixin
+
+if TYPE_CHECKING:
+from py4j.java_gateway import JavaObject
+from pyspark.core.rdd import RDD
+from pyspark.core.context import SparkContext
+from pyspark._typing import PrimitiveType
+from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
+from pyspark.sql._typing import (
+ColumnOrName,
+ColumnOrNameOrOrdinal,
+LiteralType,
+OptionalPrimitiveType,
+)
+from pyspark.sql.context import SQLContext
+from pyspark.sql.session import SparkSession
+from pyspark.sql.group import GroupedData
+from pyspark.sql.observation import Observation
+
+
+class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin):
+def __new__(
+cls,
+jdf: "JavaObject",
+sql_ctx: Union["SQLContext", "SparkSession"],
+) -> "DataFrame":
+self = object.__new__(cls)
+self.__init__(jdf, sql_ctx)  # type: ignore[misc]
+return self
+
+def __init__(
+self,
+jdf: "JavaObject",
+sql_ctx: Union["SQLContext", "SparkSession"],
+):
+from pyspark.sql.context import SQLContext
+
+self._sql_ctx: Optional["SQLContext"] = None
+
+if isinstance(sql_ctx, SQLContext):
+assert not os.environ.get("SPARK_TESTING")  # Sanity check for our 
internal usage.
+assert isinstance(sql_ctx, SQLContext)
+# We should remove this if-else branch in the future release, and 
rename
+# sql_ctx to session in the constructor. This is an internal code 
path but
+# was kept with a warning because it's used intensively by 
third-party libraries.
+warnings.warn("DataFrame constructor is internal. Do not directly 
use it.")
+self._sql_ctx = sql_ctx
+session = sql_ctx.sparkSession
+else:
+session = sql_ctx
+self._session: "SparkSession" = session
+
+self._sc: "SparkContext" = sql_ctx._sc
+self._jdf: "JavaObject" = jdf
+self.is_cached = False
+# initialized lazily
+self._schema: Optional[StructType] = None
+self._lazy_rdd: Optional["RDD[Row]"] = None
+# Check whether _repr_html is supported or not, we use it to avoid 
calling _jdf twice
+# by __repr__ and _repr_html_ while eager evaluation opens.
+self._support_repr_html = False
+
+@property
+def 

Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1573094548


##
python/pyspark/sql/utils.py:
##
@@ -302,6 +302,33 @@ def wrapped(*args: Any, **kwargs: Any) -> Any:
 return cast(FuncT, wrapped)
 
 
+def dispatch_df_method(f: FuncT) -> FuncT:
+"""
+For the usecases of direct DataFrame.union(df, ...), it checks if self
+is a Connect DataFrame or Classic DataFrame, and dispatches.
+"""
+
+@functools.wraps(f)
+def wrapped(*args: Any, **kwargs: Any) -> Any:
+if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ:
+from pyspark.sql.connect.dataframe import DataFrame as 
ConnectDataFrame
+
+if isinstance(args[0], ConnectDataFrame):
+return getattr(ConnectDataFrame, f.__name__)(*args, **kwargs)
+else:
+from pyspark.sql.classic.dataframe import DataFrame as 
ClassicDataFrame

Review Comment:
   It should be covered by `is_remote` which is `True` when `is_remote_only` is 
`True`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1573094799


##
python/pyspark/sql/connect/session.py:
##
@@ -325,7 +325,7 @@ def active(cls) -> "SparkSession":
 
 active.__doc__ = PySparkSession.active.__doc__
 
-def table(self, tableName: str) -> DataFrame:
+def table(self, tableName: str) -> ParentDataFrame:

Review Comment:
   this was the way MyPy least complained IIRC. Let me take a look again ..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun opened a new pull request, #46145:
URL: https://github.com/apache/spark/pull/46145

   …
   
   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573092072


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":

Review Comment:
   This is a private method to fall back to simple reader when streaming reader 
is not implemented. I would be ok to change this if there is an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573085074


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):

Review Comment:
   Where should we move these functions to if we don't want them to be public 
API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1573085009


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   Simple reader doesn't expose partitioning to user, so this is only used for 
the wrapper to make simple reader integrate with streaming engine. Where should 
we move these internal code if we don't want it to be public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47924][CORE] Add a DEBUG log to `DiskStore.moveFileToBlock` [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun opened a new pull request, #46144:
URL: https://github.com/apache/spark/pull/46144

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-19335][SPARK-38200][SQL] Add upserts for writing to JDBC [spark]

2024-04-19 Thread via GitHub


github-actions[bot] commented on PR #41518:
URL: https://github.com/apache/spark/pull/41518#issuecomment-2067417265

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47922][SQL] Implement the try_parse_json expression [spark]

2024-04-19 Thread via GitHub


harshmotw-db commented on PR #46141:
URL: https://github.com/apache/spark/pull/46141#issuecomment-2067417422

   cc @chenhao-db @cloud-fan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45709][BUILD] Deploy packages when all packages are built [spark]

2024-04-19 Thread via GitHub


github-actions[bot] commented on PR #43561:
URL: https://github.com/apache/spark/pull/43561#issuecomment-2067417258

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-47672][SQL] Avoid double eval from filter pushDown w/ projection pushdown [spark]

2024-04-19 Thread via GitHub


holdenk opened a new pull request, #46143:
URL: https://github.com/apache/spark/pull/46143

   ### What changes were proposed in this pull request?
   
   Changes the filter pushDown optimizer to not push down past projections of 
the same element if we reasonable expect that computing that element is likely 
to be expensive.
   
   This is a more complex alternative to 
https://github.com/apache/spark/pull/45802 which also moves parts of 
projections down so that the filters can move further down.
   
   This introduces an "expectedCost" mechanism which we may or may not want. 
Previous filter ordering work used filter pushdowns as an approximation of 
expression cost but here we need more granularity. As an alternative we could 
introduce a flag for expensive rather than numeric operations. Another 
alternative would be seeing if the predicate can be "converted" as a proxy for 
cheap.
   
   ### Future Work / What else remains to do?
   
   Right now if a cond is expensive and it references something in the 
projection we don't push-down. We could probably do better and gate this on if 
the thing we are reference is expensive rather than the condition it's self. We 
could do this as a follow up item or as part of this PR.
   
   ### Why are the changes needed?
   
   Currently Spark may double compute expensive operations (like json parsing, 
UDF eval, etc.) as a result of filter pushdown past projections.
   
   ### Does this PR introduce _any_ user-facing change?
   
   SQL optimizer change may impact some user queries, results should be the 
same and hopefully a little faster.
   
   ### How was this patch tested?
   
   New tests were added to the FilterPushDownSuite, and the initial problem of 
double evaluation was confirmed with a github gist 
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47923][R] Upgrade the minimum version of `arrow` R package to 10.0.0 [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun opened a new pull request, #46142:
URL: https://github.com/apache/spark/pull/46142

   …
   
   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]

2024-04-19 Thread via GitHub


harshmotw-db commented on code in PR #46122:
URL: https://github.com/apache/spark/pull/46122#discussion_r1573032141


##
python/pyspark/sql/types.py:
##
@@ -1521,6 +1521,19 @@ def toPython(self) -> Any:
 """
 return VariantUtils.to_python(self.value, self.metadata)
 
+def toJson(self, zone_id: str = "UTC") -> str:

Review Comment:
   Resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]

2024-04-19 Thread via GitHub


harshmotw-db commented on code in PR #46122:
URL: https://github.com/apache/spark/pull/46122#discussion_r1573031748


##
python/pyspark/sql/types.py:
##
@@ -1521,6 +1521,19 @@ def toPython(self) -> Any:
 """
 return VariantUtils.to_python(self.value, self.metadata)
 
+def toJson(self, zone_id: str = "UTC") -> str:

Review Comment:
   Thanks! I was trying to figure out why document generation was failing in 
the linting jobs. I thought it was a syntax error in Sphinx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library [spark]

2024-04-19 Thread via GitHub


gene-db commented on code in PR #46122:
URL: https://github.com/apache/spark/pull/46122#discussion_r1573029280


##
python/pyspark/sql/types.py:
##
@@ -1521,6 +1521,19 @@ def toPython(self) -> Any:
 """
 return VariantUtils.to_python(self.value, self.metadata)
 
+def toJson(self, zone_id: str = "UTC") -> str:

Review Comment:
   You might need to update this 
https://github.com/databricks/runtime/blob/master/python/docs/source/reference/pyspark.sql/variant_val.rst?plain=1#L27
   to generate the docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]

2024-04-19 Thread via GitHub


ericm-db commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1573000267


##
python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py:
##
@@ -66,6 +66,30 @@ def func(df, _):
 q = df.writeStream.foreachBatch(func).start()
 q.processAllAvailable()
 
+def test_pickling_deserialization_error(self):
+class NoUnpickle:
+def __init__(self, data):
+self.data = data
+
+def __reduce__(self):
+# Serialize only the data attribute
+return (self.__class__, (self.data,))

Review Comment:
   Ah yeah, will make the refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]

2024-04-19 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1572997060


##
python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py:
##
@@ -66,6 +66,30 @@ def func(df, _):
 q = df.writeStream.foreachBatch(func).start()
 q.processAllAvailable()
 
+def test_pickling_deserialization_error(self):
+class NoUnpickle:
+def __init__(self, data):
+self.data = data
+
+def __reduce__(self):
+# Serialize only the data attribute
+return (self.__class__, (self.data,))

Review Comment:
   I think here, you should return a function that throws.
   ```
   def __reduce__(self):
 reurn (error_fn, ())
   
   def error_fn:
  raise ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]

2024-04-19 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1572997327


##
python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py:
##
@@ -66,6 +66,30 @@ def func(df, _):
 q = df.writeStream.foreachBatch(func).start()
 q.processAllAvailable()
 
+def test_pickling_deserialization_error(self):
+class NoUnpickle:
+def __init__(self, data):
+self.data = data
+
+def __reduce__(self):
+# Serialize only the data attribute
+return (self.__class__, (self.data,))
+
+def __reduce_ex__(self, proto):

Review Comment:
   remove this to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP] Testing that error is propagated to user upon deserialization [spark]

2024-04-19 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1572995277


##
python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py:
##
@@ -63,8 +63,13 @@ def main(infile: IO, outfile: IO) -> None:
 spark = spark_connect_session
 
 # TODO(SPARK-44461): Enable Process Isolation
+try:

Review Comment:
   Can this try be around teh while loop below? I.e
   ```
   try:
func = worker.read_command(pickle_ser, infile)
write_int...
while True:
..
   except
   ```
   Note that currently, errors around `read_long()` are not handled well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Try parse json [spark]

2024-04-19 Thread via GitHub


harshmotw-db opened a new pull request, #46141:
URL: https://github.com/apache/spark/pull/46141

   
   
   ### What changes were proposed in this pull request?
   
   This pull request implements the `try_parse_json` that runs `parse_json` on 
string expressions to extract variants. However, if `parse_json` throws an 
exception on a row, the value `null` is returned.
   
   ### Why are the changes needed?
   
   Sometimes, columns containing JSON strings may contain some invalid inputs 
that should be ignored instead of having the whole execution failed because of 
it.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, it allows users to run the `try_parse_json` expression.
   
   ### How was this patch tested?
   
   Unit tests to check if `try_parse_json` works just like `parse_json` on 
valid inputs, returns `null` on invalid inputs, and fails on incorrect input 
data types.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


ueshin commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1572787315


##
python/pyspark/sql/dataframe.py:
##
@@ -139,51 +123,29 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 created via using the constructor.
 """
 
-def __init__(
-self,
+# HACK ALERT!! this is to reduce the backward compatibility concern, and 
returns
+# Spark Classic DataFrame by default. This is NOT an API, and NOT supposed 
to
+# be directly invoked. DO NOT use this constructor.
+_sql_ctx: Optional["SQLContext"]
+_session: "SparkSession"
+_sc: "SparkContext"
+_jdf: "JavaObject"
+is_cached: bool
+_schema: Optional[StructType]
+_lazy_rdd: Optional["RDD[Row]"]
+_support_repr_html: bool
+
+def __new__(
+cls,
 jdf: "JavaObject",
 sql_ctx: Union["SQLContext", "SparkSession"],
-):
-from pyspark.sql.context import SQLContext
-
-self._sql_ctx: Optional["SQLContext"] = None
-
-if isinstance(sql_ctx, SQLContext):
-assert not os.environ.get("SPARK_TESTING")  # Sanity check for our 
internal usage.
-assert isinstance(sql_ctx, SQLContext)
-# We should remove this if-else branch in the future release, and 
rename
-# sql_ctx to session in the constructor. This is an internal code 
path but
-# was kept with a warning because it's used intensively by 
third-party libraries.
-warnings.warn("DataFrame constructor is internal. Do not directly 
use it.")
-self._sql_ctx = sql_ctx
-session = sql_ctx.sparkSession
-else:
-session = sql_ctx
-self._session: "SparkSession" = session
-
-self._sc: "SparkContext" = sql_ctx._sc
-self._jdf: "JavaObject" = jdf
-self.is_cached = False
-# initialized lazily
-self._schema: Optional[StructType] = None
-self._lazy_rdd: Optional["RDD[Row]"] = None
-# Check whether _repr_html is supported or not, we use it to avoid 
calling _jdf twice
-# by __repr__ and _repr_html_ while eager evaluation opens.
-self._support_repr_html = False
-
-@property
-def sql_ctx(self) -> "SQLContext":
-from pyspark.sql.context import SQLContext
+) -> "DataFrame":
+from pyspark.sql.classic.dataframe import DataFrame
 
-warnings.warn(
-"DataFrame.sql_ctx is an internal property, and will be removed "
-"in future releases. Use DataFrame.sparkSession instead."
-)
-if self._sql_ctx is None:
-self._sql_ctx = SQLContext._get_or_create(self._sc)
-return self._sql_ctx
+return DataFrame.__new__(DataFrame, jdf, sql_ctx)
 
 @property
+@dispatch_df_method

Review Comment:
   The dispatch for `property` seems not working.
   
   ```py
   >>> class A:
   ...   @property
   ...   @dispatch_df_method
   ...   def a(self):
   ... return 1
   >>>
   >>> a = A()
   >>> A.a(a)
   Traceback (most recent call last):
 File "", line 1, in 
   TypeError: 'property' object is not callable
   ```
   
   I don't think we need this for `property` as this usage of `property` won't 
work anyway?



##
python/pyspark/sql/classic/dataframe.py:
##
@@ -0,0 +1,1974 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import json
+import sys
+import random
+import warnings
+from collections.abc import Iterable
+from functools import reduce
+from typing import (
+Any,
+Callable,
+Dict,
+Iterator,
+List,
+Optional,
+Sequence,
+Tuple,
+Type,
+Union,
+cast,
+overload,
+TYPE_CHECKING,
+)
+
+from pyspark import _NoValue
+from pyspark.resource import ResourceProfile
+from pyspark._globals import _NoValueType
+from pyspark.errors import (
+PySparkTypeError,
+PySparkValueError,
+PySparkIndexError,
+PySparkAttributeError,
+)
+from pyspark.util import (
+is_remote_only,
+_load_from_socket,
+_local_iterator_from_socket,
+)
+from pyspark.serializers import BatchedSerializer, CPickleSerializer, 
UTF8Deserializer
+from pyspark.storagelevel import 

Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]

2024-04-19 Thread via GitHub


allisonwang-db commented on PR #46140:
URL: https://github.com/apache/spark/pull/46140#issuecomment-2067273091

   cc @jasonli-db @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]

2024-04-19 Thread via GitHub


allisonwang-db opened a new pull request, #46140:
URL: https://github.com/apache/spark/pull/46140

   
   
   ### What changes were proposed in this pull request?
   
   This PR fixes a bug in the ExecuteJobTag creation in ExecuteHolder. The 
sessionId and userId are reversed.
   
   ### Why are the changes needed?
   
   To fix a bug
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Existing test
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572915019


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+try:
+return self.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))

Review Comment:
   Make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572914583


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


allisonwang-db commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572908487


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+try:
+return self.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))
+
+def simpleStreamReader(self, schema: StructType) -> 
"SimpleDataSourceStreamReader":
+"""
+Returns a :class:`SimpleDataSourceStreamReader` instance for reading 
data.
+
+One of simpleStreamReader() and streamReader() must be implemented for 
readable streaming
+data source.

Review Comment:
   Can we be more explicit about when users should choose streamReader versus 
simpleStreamReader here? This information will be included in the API 
documentation for this class.



##
python/pyspark/sql/worker/plan_data_source_read.py:
##
@@ -51,6 +52,71 @@
 )
 
 
+def records_to_arrow_batches(
+output_iter: Iterator[Tuple],
+max_arrow_batch_size: int,
+return_type: StructType,
+data_source: DataSource,
+) -> Iterable[pa.RecordBatch]:

Review Comment:
   Let's add some docstring for this function.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):

Review Comment:
   Why do we need this in the public API? Why can't user define their own input 
partition class?



##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":

Review Comment:
   Why do we need this `_streamReader` in datasource API?



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+

[PR] [SPARK-47920] add doc for python streaming data source API [spark]

2024-04-19 Thread via GitHub


chaoqin-li1123 opened a new pull request, #46139:
URL: https://github.com/apache/spark/pull/46139

   
   
   ### What changes were proposed in this pull request?
   add doc for python streaming data source API
   
   
   ### Why are the changes needed?
   
   Add user guide to help user develop python streaming data source.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572840033


##
config/checkstyle/checkstyle.xml:
##
@@ -0,0 +1,195 @@
+
+
+https://checkstyle.org/dtds/configuration_1_3.dtd;>
+
+
+
+

Review Comment:
   Please make a spin-off PR for `Static Analysis`-related stuff, @jiangzho .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572840033


##
config/checkstyle/checkstyle.xml:
##
@@ -0,0 +1,195 @@
+
+
+https://checkstyle.org/dtds/configuration_1_3.dtd;>
+
+
+
+

Review Comment:
   Please make a spin-off PR for `checkstyle`-related stuff, @jiangzho .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572837826


##
build.gradle:
##
@@ -1,3 +1,16 @@
+buildscript {
+  repositories {
+maven {
+  url = uri("https://plugins.gradle.org/m2/;)
+}
+  }
+  dependencies {
+classpath 
"com.github.spotbugs.snom:spotbugs-gradle-plugin:${spotBugsGradlePluginVersion}"
+  }
+}
+
+assert JavaVersion.current().isJava11Compatible(): "Java 11 or newer is 
required"

Review Comment:
   Please drop Java 11.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572836874


##
build-tools/helm/spark-kubernetes-operator/values.yaml:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+image:
+  repository: spark-kubernetes-operator
+  pullPolicy: IfNotPresent
+  # tag: latest
+  # If image digest is set then it takes precedence and the image tag will be 
ignored
+  # digest: ""
+
+imagePullSecrets: [ ]
+
+operatorDeployment:
+  # Replicas must be 1
+  replicas: 1
+  # Strategy type must be 'Recreate' unless leader election is configured
+  strategy:
+type: Recreate
+  operatorPod:
+priorityClassName: null
+annotations: { }
+labels: { }
+affinity: { }
+nodeSelector: { }
+# Node tolerations for operator pod assignment
+# 
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
+tolerations: [ ]
+# Topology spread constrains
+# 
https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/
+topologySpreadConstraints: [ ]
+operatorContainer:
+  jvmArgs: "-XX:+UseG1GC -Xms3G -Xmx3G -Dfile.encoding=UTF8"
+  env:
+  envFrom:
+  volumeMounts: { }
+  resources:
+limits:
+  cpu: "1"
+  ephemeral-storage: 2Gi
+  memory: 4Gi
+requests:
+  cpu: "1"
+  ephemeral-storage: 2Gi
+  memory: 4Gi
+  probes:
+port: 18080
+livenessProbe:
+  periodSeconds: 10
+  initialDelaySeconds: 30
+startupProbe:
+  failureThreshold: 30
+  periodSeconds: 10
+  metrics:
+port: 19090
+  securityContext:
+allowPrivilegeEscalation: false
+capabilities:
+  drop:
+- ALL
+runAsNonRoot: true
+runAsUser: 
+seccompProfile:
+  type: RuntimeDefault
+additionalContainers: { }
+# additionalContainers:
+#  - name: ""
+#image: ""
+volumes: { }
+# volumes:
+#   - name: spark-artifacts
+# hostPath:
+#   path: /tmp/spark/artifacts
+#   type: DirectoryOrCreate
+securityContext: { }
+dnsPolicy:
+dnsConfig:
+
+operatorRbac:
+  serviceAccount:
+create: true
+name: "spark-operator"
+  # If disabled, a Role would be created inside each app namespace for app 
operations
+  clusterRole:
+create: true
+name: "spark-operator-clusterrole"
+  # If disabled, a RoleBinding would be created inside each app namespace for 
app operations
+  clusterRoleBinding:
+create: true
+name: "spark-operator-clusterrolebinding"
+  configManagement:
+roleName: "spark-operator-config-role"
+roleBindingName: "spark-operator-config-role-binding"
+
+appResources:
+  # Create namespace(s), service account(s) and rolebinding(s) for SparkApps, 
if configured
+  # Operator would act at cluster level by default if no app namespace(s) are 
provided
+  namespaces:
+create: true
+# When enabled, operator would by default only watch namespace(s) provided 
in data field
+watchGivenNamespacesOnly: false
+data:
+  #- "spark-demo"
+  #- "spark-demo"

Review Comment:
   May I ask why we have the same comment twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572836113


##
build-tools/helm/spark-kubernetes-operator/values.yaml:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+image:
+  repository: spark-kubernetes-operator
+  pullPolicy: IfNotPresent
+  # tag: latest
+  # If image digest is set then it takes precedence and the image tag will be 
ignored
+  # digest: ""
+
+imagePullSecrets: [ ]
+
+operatorDeployment:
+  # Replicas must be 1
+  replicas: 1
+  # Strategy type must be 'Recreate' unless leader election is configured
+  strategy:
+type: Recreate
+  operatorPod:
+priorityClassName: null
+annotations: { }
+labels: { }
+affinity: { }
+nodeSelector: { }
+# Node tolerations for operator pod assignment
+# 
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
+tolerations: [ ]
+# Topology spread constrains
+# 
https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/
+topologySpreadConstraints: [ ]
+operatorContainer:
+  jvmArgs: "-XX:+UseG1GC -Xms3G -Xmx3G -Dfile.encoding=UTF8"
+  env:
+  envFrom:
+  volumeMounts: { }
+  resources:
+limits:
+  cpu: "1"
+  ephemeral-storage: 2Gi
+  memory: 4Gi
+requests:
+  cpu: "1"
+  ephemeral-storage: 2Gi
+  memory: 4Gi
+  probes:
+port: 18080
+livenessProbe:
+  periodSeconds: 10
+  initialDelaySeconds: 30
+startupProbe:
+  failureThreshold: 30
+  periodSeconds: 10
+  metrics:
+port: 19090
+  securityContext:
+allowPrivilegeEscalation: false
+capabilities:
+  drop:
+- ALL
+runAsNonRoot: true
+runAsUser: 
+seccompProfile:
+  type: RuntimeDefault
+additionalContainers: { }
+# additionalContainers:
+#  - name: ""
+#image: ""

Review Comment:
   Please remove 80 ~ 82 because we have line 79.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572834233


##
.gitignore:
##
@@ -16,3 +16,30 @@ build
 dependencies.lock
 **/dependencies.lock
 gradle/wrapper/gradle-wrapper.jar
+
+# Compiled source #
+###
+*.class
+*.dll
+*.exe
+*.o
+*.so
+*.pyc
+
+# Packages #
+
+*.7z
+*.dmg
+*.gz
+*.iso
+*.rar
+*.tar
+*.zip
+
+# Logs and databases #
+##
+*.log

Review Comment:
   This looks too broad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572833180


##
.github/workflows/build_and_test.yml:
##
@@ -26,4 +26,20 @@ jobs:
   GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
 with:
   config: .github/.licenserc.yaml
-
+  test_ci:
+name: "Test CI"
+runs-on: ubuntu-latest
+strategy:
+  matrix:
+java-version: [ 11, 17, 21 ]

Review Comment:
   Please remove Java 11 and make a spin-off PR for this `build_and_test.yml`, 
@jiangzho .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Operator 0.1.0 [spark-kubernetes-operator]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on code in PR #2:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/2#discussion_r1572833649


##
.gitignore:
##
@@ -16,3 +16,30 @@ build
 dependencies.lock
 **/dependencies.lock
 gradle/wrapper/gradle-wrapper.jar
+
+# Compiled source #
+###
+*.class
+*.dll
+*.exe
+*.o
+*.so
+*.pyc
+
+# Packages #
+
+*.7z

Review Comment:
   For my understanding, where this came from?



##
.gitignore:
##
@@ -16,3 +16,30 @@ build
 dependencies.lock
 **/dependencies.lock
 gradle/wrapper/gradle-wrapper.jar
+
+# Compiled source #
+###
+*.class
+*.dll
+*.exe
+*.o
+*.so
+*.pyc
+
+# Packages #
+
+*.7z
+*.dmg
+*.gz
+*.iso
+*.rar

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on PR #46129:
URL: https://github.com/apache/spark/pull/46129#issuecomment-2067140476

   Will fix up the tests soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-19 Thread via GitHub


sahnib commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1572618909


##
python/pyspark/sql/datasource.py:
##
@@ -183,11 +186,40 @@ def streamWriter(self, schema: StructType, overwrite: 
bool) -> "DataSourceStream
 message_parameters={"feature": "streamWriter"},
 )
 
+def _streamReader(self, schema: StructType) -> "DataSourceStreamReader":
+try:
+return self.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(self.simpleStreamReader(schema=schema))

Review Comment:
   As we prefer `streamReader` over `simpleStreamReader`, can we call out in 
the docs that `streamReader` will be picked if the user has implemented both 
functions. 



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,200 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry(InputPartition):
+def __init__(self, start: dict, end: dict, it: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.it = it
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+end : dict
+The offset where the reading stop.
+
+Returns
+---
+iterator of :class:`Tuple`\\s
+All the records between start offset and end offset.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read2"},
+)
+
+def commit(self, end: dict) -> None:
+"""
+Informs the source that Spark has completed processing all data for 
offsets less than or
+equal to `end` and will only request offsets greater than `end` in the 
future.
+
+Parameters
+--
+end : dict
+The latest offset that the streaming query has processed for this 
source.
+"""
+...
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query 

Re: [PR] [SPARK-47805][SS] Implementing TTL for MapState [spark]

2024-04-19 Thread via GitHub


ericm-db commented on PR #45991:
URL: https://github.com/apache/spark/pull/45991#issuecomment-2067115942

   @HeartSaVioR PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-04-19 Thread via GitHub


steveloughran commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2067069380

   So both those bindings hand off to PathOutputCommitterFactory(), which looks 
for a committer from the config key mapreduce.outputcommitter.factory.class
   FileOutputCommitterFactory: classic committer
   NamedCommitterFactory: class in mapreduce.outputcommitter.named.classname
   
   then fallback to mapreduce.outputcommitter.factory.scheme.SCHEMA factory 
definition.
   
   The idea being; you get an fs specific one unless asked for. 
   * the parquet one is there because parquet is fussy about its committer 
subclasses; that should be reviewed (where?)
   * and PathOutputCommitProtocol is probably surplus now that spark can use 
PathOutputCommitter everywhere...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-47907] Put bang under a config [spark]

2024-04-19 Thread via GitHub


srielau opened a new pull request, #46138:
URL: https://github.com/apache/spark/pull/46138

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47418][SQL] Add hand-crafted implementations for lowercase unicode-aware contains, startsWith and endsWith and optimize UTF8_BINARY_LCASE [spark]

2024-04-19 Thread via GitHub


vladimirg-db commented on code in PR #46082:
URL: https://github.com/apache/spark/pull/46082#discussion_r1572665195


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -359,10 +414,97 @@ public boolean startsWith(final UTF8String prefix) {
 return matchAt(prefix, 0);
   }
 
+  /**
+   * Checks whether `prefix` is a prefix of `this` in a lowercase 
unicode-aware manner
+   *
+   * This function is written in a way which avoids excessive allocations in 
case if we work with
+   * bare ASCII-character strings.
+   */
+  public boolean startsWithInLowerCase(final UTF8String prefix) {
+// No way to match sizes of strings for early return, since single 
grapheme can be expanded
+// into several independent ones in lowercase
+if (prefix.numBytes == 0) {
+  return true;
+}
+if (numBytes == 0) {
+  return false;
+}
+
+for (var i = 0; i < prefix.numBytes; i++) {

Review Comment:
   I wanted to do something like this, but I think that this simple 
implementation is better for the next reasons:
   - We couldn't do something like `matchAt(...)` by using an offset in `this`, 
because we actually don't know the exact offset (at least without the full 
conversion to lowercase), since lowercase representation of the unicode string 
can actually be longer (or shorter) than the original. That's why to check the 
prefix the loop is forward and to check the suffix the loop is backward
   - We probably could do a parametrized function, which does something like 
`startOffset + this.offset + direction * i`, which would work for the loop 
indexing, but I can't think of the suitable math for the inner `if` from the 
top of my head. It's probably possible, but would be much more involved, since 
that code differs significantly in terms of indexing
   - We could just substitute the loop body with the closure, and pass it as a 
strategy (since that's what differs), but introducing looping virtual call in 
an optimization PR wouldn't be reasonable
   - It reads trivially, which is good for the reader
   - The copy-paste here is not that large



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45265][SQL] Supporting Hive 4.0 Metastore [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #45801: [SPARK-45265][SQL] Supporting Hive 
4.0 Metastore
URL: https://github.com/apache/spark/pull/45801


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47900] Fix check for implicit (UTF8_BINARY) collation [spark]

2024-04-19 Thread via GitHub


stefankandic commented on PR #46116:
URL: https://github.com/apache/spark/pull/46116#issuecomment-2066908704

   @cloud-fan all checks passing after restarting the failed job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47418][SQL] Add hand-crafted implementations for lowercase unicode-aware contains, startsWith and endsWith and optimize UTF8_BINARY_LCASE [spark]

2024-04-19 Thread via GitHub


dbatomic commented on code in PR #46082:
URL: https://github.com/apache/spark/pull/46082#discussion_r1572603566


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -359,10 +414,97 @@ public boolean startsWith(final UTF8String prefix) {
 return matchAt(prefix, 0);
   }
 
+  /**
+   * Checks whether `prefix` is a prefix of `this` in a lowercase 
unicode-aware manner
+   *
+   * This function is written in a way which avoids excessive allocations in 
case if we work with
+   * bare ASCII-character strings.
+   */
+  public boolean startsWithInLowerCase(final UTF8String prefix) {
+// No way to match sizes of strings for early return, since single 
grapheme can be expanded
+// into several independent ones in lowercase
+if (prefix.numBytes == 0) {
+  return true;
+}
+if (numBytes == 0) {
+  return false;
+}
+
+for (var i = 0; i < prefix.numBytes; i++) {

Review Comment:
   Would it be possible to share implementation between startsWithInLowerCase 
and endsWithInLowerCase?
   
   E.g. if you were to use two variables (offset in prefix and offset in this) 
you should be able to share the code between these two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47902][SQL]Making Compute Current Time* expressions foldable [spark]

2024-04-19 Thread via GitHub


dbatomic commented on code in PR #46120:
URL: https://github.com/apache/spark/pull/46120#discussion_r1572580087


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala:
##
@@ -811,4 +811,10 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite 
with SQLHelper with Quer
 "This should have been converted during analysis."))
 )
   }
+
+  test("check that current time is foldable") {
+val rnd =
+  Rand(Cast(UnixTimestamp(CurrentDate(), Literal("-MM-dd HH:mm:ss")), 
IntegerType))

Review Comment:
   oh, yes, sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on PR #46019:
URL: https://github.com/apache/spark/pull/46019#issuecomment-2066830650

   Merged to branch-3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun commented on PR #46019:
URL: https://github.com/apache/spark/pull/46019#issuecomment-2066829697

   Thank you all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47825][DSTREAMS][3.5] Make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #46019: [SPARK-47825][DSTREAMS][3.5] Make 
`KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated
URL: https://github.com/apache/spark/pull/46019


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46632][SQL] Fix subexpression elimination when equivalent ternary expressions have different children [spark]

2024-04-19 Thread via GitHub


peter-toth commented on code in PR #46135:
URL: https://github.com/apache/spark/pull/46135#discussion_r1572557495


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala:
##
@@ -494,6 +494,18 @@ class SubexpressionEliminationSuite extends SparkFunSuite 
with ExpressionEvalHel
 checkShortcut(Or(equal, Literal(true)), 1)
 checkShortcut(Not(And(equal, Literal(false))), 1)
   }
+
+  test("Equivalent ternary expressions have different children") {

Review Comment:
   Hmm, this is because `((1 + 2) + 3)` semanticEquals to `((3 + 1) + 2)` but 
their children are different. So when we compute the common elements between 
the above 2 here:
   
   
https://github.com/apache/spark/blob/0d553d06fe2f05571531ada0f08c0cc45418e941/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L112-L121
   
   only `((1 + 2) + 3)` remains in `localEquivalenceMap` but it's children 
don't. So later when we want to remove `((1 + 2) + 3)` we can't find its 
children...
   
   I think we could fix the above code that computes `localEquivalenceMap`, but 
the change in PR is simpler and doesn't seem to do any harm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47828][CONNECT][PYTHON][3.4] DataFrameWriterV2.overwrite fails with invalid plan [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #46051: [SPARK-47828][CONNECT][PYTHON][3.4] 
DataFrameWriterV2.overwrite fails with invalid plan
URL: https://github.com/apache/spark/pull/46051


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 [spark]

2024-04-19 Thread via GitHub


dongjoon-hyun closed pull request #46137: [SPARK-47915][BUILD][K8S] Upgrade 
`kubernetes-client` to 6.12.1
URL: https://github.com/apache/spark/pull/46137


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47902][SQL]Making Compute Current Time* expressions foldable [spark]

2024-04-19 Thread via GitHub


cloud-fan commented on code in PR #46120:
URL: https://github.com/apache/spark/pull/46120#discussion_r1572478404


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala:
##
@@ -811,4 +811,10 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite 
with SQLHelper with Quer
 "This should have been converted during analysis."))
 )
   }
+
+  test("check that current time is foldable") {
+val rnd =
+  Rand(Cast(UnixTimestamp(CurrentDate(), Literal("-MM-dd HH:mm:ss")), 
IntegerType))

Review Comment:
   int: we can simply do `Month(CurrentDate())` to turn date into int.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-47913][CORE][SQL][DSTREAMS][TESTS] Move `src/test/java/test/*` to `src/test/java/*` [spark]

2024-04-19 Thread via GitHub


panbingkun closed pull request #46134: 
[WIP][SPARK-47913][CORE][SQL][DSTREAMS][TESTS] Move `src/test/java/test/*` to 
`src/test/java/*`
URL: https://github.com/apache/spark/pull/46134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python. [spark]

2024-04-19 Thread via GitHub


chenhao-db commented on code in PR #46123:
URL: https://github.com/apache/spark/pull/46123#discussion_r1572455657


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala:
##
@@ -2485,6 +2485,30 @@ class PlanGenerationTestSuite
 fn.to_json(fn.col("d"), Map(("timestampFormat", "dd/MM/")))
   }
 
+  functionTest("parse_json") {

Review Comment:
   Done. Thanks for pointing out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]

2024-04-19 Thread via GitHub


nikolamand-db commented on code in PR #46077:
URL: https://github.com/apache/spark/pull/46077#discussion_r1572379229


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types._
+
+class CollationRegexpExpressionSuite extends SparkFunSuite with 
ExpressionEvalHelper {

Review Comment:
   Missed that `Expression`s. Maybe even better to give them the same name, 
this is kind of confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]

2024-04-19 Thread via GitHub


uros-db commented on code in PR #46077:
URL: https://github.com/apache/spark/pull/46077#discussion_r1572369335


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types._
+
+class CollationRegexpExpressionSuite extends SparkFunSuite with 
ExpressionEvalHelper {

Review Comment:
   well that is a different kind of suite (SparkFunSuite), used for sql tests 
so I put it in the sql/ package
   whereas this one is for unit testing (QueryTest), and has to be in the 
expressions/ package
   
   I think they should indeed stay separated
   
   btw the names are slightly different:
   .../expressions/CollationRegexpExpressionSuite.scala
   sq/CollationRegexpExpressionsSuite.scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47297][SQL] Add collations support to split regex expression [spark]

2024-04-19 Thread via GitHub


nikolamand-db closed pull request #45856: [SPARK-47297][SQL] Add collations 
support to split regex expression
URL: https://github.com/apache/spark/pull/45856


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47297][SQL] Add collations support to split regex expression [spark]

2024-04-19 Thread via GitHub


nikolamand-db commented on PR #45856:
URL: https://github.com/apache/spark/pull/45856#issuecomment-2066549783

   Closing as we have new approach for all regex functions 
https://github.com/apache/spark/pull/46077.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]

2024-04-19 Thread via GitHub


nikolamand-db commented on code in PR #46077:
URL: https://github.com/apache/spark/pull/46077#discussion_r1572342093


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types._
+
+class CollationRegexpExpressionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+
+  test("Like/ILike/RLike expressions with collated strings") {
+case class LikeTestCase[R](l: String, regexLike: String, regexRLike: 
String, collation: String,
+   expectedLike: R, expectedILike: R, 
expectedRLike: R)
+val testCases = Seq(
+  LikeTestCase("AbC", "%AbC%", ".b.", "UTF8_BINARY", true, true, true),
+  LikeTestCase("AbC", "%ABC%", ".B.", "UTF8_BINARY", false, true, false),
+  LikeTestCase("AbC", "%abc%", ".b.", "UTF8_BINARY_LCASE", true, true, 
true),
+  LikeTestCase("", "", "", "UTF8_BINARY_LCASE", true, true, true),
+  LikeTestCase("Foo", "", "", "UTF8_BINARY_LCASE", false, false, true),
+  LikeTestCase("", "%foo%", ".o.", "UTF8_BINARY_LCASE", false, false, 
false),
+  LikeTestCase("AbC", "%ABC%", ".B.", "UNICODE", false, true, false),
+  LikeTestCase(null, "%foo%", ".o.", "UNICODE", null, null, null),
+  LikeTestCase("Foo", null, null, "UNICODE", null, null, null),
+  LikeTestCase(null, null, null, "UNICODE", null, null, null)
+)
+testCases.foreach(t => {
+  // Like
+  checkEvaluation(Like(
+Literal.create(t.l, 
StringType(CollationFactory.collationNameToId(t.collation))),
+Literal.create(t.regexLike, StringType), '\\'), t.expectedLike)
+  // ILike
+  checkEvaluation(ILike(
+Literal.create(t.l, 
StringType(CollationFactory.collationNameToId(t.collation))),
+Literal.create(t.regexLike, StringType), '\\').replacement, 
t.expectedILike)
+  // RLike
+  checkEvaluation(RLike(
+Literal.create(t.l, 
StringType(CollationFactory.collationNameToId(t.collation))),
+Literal.create(t.regexRLike, StringType)), t.expectedRLike)
+})
+  }
+
+  test("StringSplit expression with collated strings") {
+case class StringSplitTestCase[R](s: String, r: String, collation: String, 
expected: R)
+val testCases = Seq(
+  StringSplitTestCase("1A2B3C", "[ABC]", "UTF8_BINARY", Seq("1", "2", "3", 
"")),
+  StringSplitTestCase("1A2B3C", "[abc]", "UTF8_BINARY", Seq("1A2B3C")),
+  StringSplitTestCase("1A2B3C", "[ABC]", "UTF8_BINARY_LCASE", Seq("1", 
"2", "3", "")),
+  StringSplitTestCase("1A2B3C", "[abc]", "UTF8_BINARY_LCASE", Seq("1", 
"2", "3", "")),
+  StringSplitTestCase("1A2B3C", "[1-9]+", "UNICODE", Seq("", "A", "B", 
"C")),
+  StringSplitTestCase("", "", "UNICODE", Seq("")),
+  StringSplitTestCase("1A2B3C", "", "UNICODE", Seq("1", "A", "2", "B", 
"3", "C")),
+  StringSplitTestCase("", "[1-9]+", "UNICODE", Seq("")),
+  StringSplitTestCase(null, "[1-9]+", "UNICODE", null),
+  StringSplitTestCase("1A2B3C", null, "UNICODE", null),
+  StringSplitTestCase(null, null, "UNICODE", null)
+)
+testCases.foreach(t => {
+  // StringSplit
+  checkEvaluation(StringSplit(
+Literal.create(t.s, 
StringType(CollationFactory.collationNameToId(t.collation))),
+Literal.create(t.r, StringType), -1), t.expected)
+})
+  }
+
+  test("Regexp expressions with collated strings") {
+case class RegexpTestCase[R](l: String, r: String, collation: String,
+ expectedExtract: R, expectedExtractAll: R, 
expectedCount: R)

Review Comment:
   ditto



##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationRegexpExpressionSuite.scala:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional 

Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]

2024-04-19 Thread via GitHub


xupefei commented on code in PR #45701:
URL: https://github.com/apache/spark/pull/45701#discussion_r1572265083


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##
@@ -27,18 +27,22 @@ import org.apache.arrow.vector.ipc.message.{ArrowMessage, 
ArrowRecordBatch}
 import org.apache.arrow.vector.types.pojo
 
 import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse.ObservedMetrics
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, 
UnboundRowEncoder}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, 
ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator}
-import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
LiteralValueProtoConverter}
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.ArrowUtils
 
 private[sql] class SparkResult[T](
 responses: CloseableIterator[proto.ExecutePlanResponse],
 allocator: BufferAllocator,
 encoder: AgnosticEncoder[T],
-timeZoneId: String)
+timeZoneId: String,
+setObservationMetricsOpt: Option[(Long, Option[Map[String, Any]]) => Unit] 
= None)

Review Comment:
   True. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]

2024-04-19 Thread via GitHub


xupefei commented on code in PR #45701:
URL: https://github.com/apache/spark/pull/45701#discussion_r1572264833


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -813,6 +823,28 @@ class SparkSession private[sql] (
* Set to false to prevent client.releaseSession on close() (testing only)
*/
   private[sql] var releaseSessionOnClose = true
+
+  private[sql] def registerObservation(planId: Long, observation: 
Observation): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, as it is only allowed to do 
this once
+observation.synchronized {
+  if (observationRegistry.contains(planId)) {
+throw new IllegalArgumentException("An Observation can be used with a 
Dataset only once")
+  }
+  observationRegistry.put(planId, observation)
+}
+  }
+
+  private[sql] def setMetricsAndUnregisterObservation(
+  planId: Long,
+  metrics: Option[Map[String, Any]]): Unit = {
+observationRegistry.get(planId).map { observation =>
+  if (observation.setMetricsAndNotify(metrics)) {
+observationRegistry.remove(planId)

Review Comment:
   But nevertheless we don't need to handle this case in Connect because we 
look up using Plan Id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]

2024-04-19 Thread via GitHub


xupefei commented on code in PR #45701:
URL: https://github.com/apache/spark/pull/45701#discussion_r1572263903


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -813,6 +823,28 @@ class SparkSession private[sql] (
* Set to false to prevent client.releaseSession on close() (testing only)
*/
   private[sql] var releaseSessionOnClose = true
+
+  private[sql] def registerObservation(planId: Long, observation: 
Observation): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, as it is only allowed to do 
this once
+observation.synchronized {
+  if (observationRegistry.contains(planId)) {
+throw new IllegalArgumentException("An Observation can be used with a 
Dataset only once")
+  }
+  observationRegistry.put(planId, observation)
+}
+  }
+
+  private[sql] def setMetricsAndUnregisterObservation(
+  planId: Long,
+  metrics: Option[Map[String, Any]]): Unit = {
+observationRegistry.get(planId).map { observation =>
+  if (observation.setMetricsAndNotify(metrics)) {
+observationRegistry.remove(planId)

Review Comment:
   I looked at the code. It seems it's valid to check for non-empty metrics in 
Spark Core:
   ```scala
   private[spark] def onFinish(qe: QueryExecution): Unit = {
 ...
 val row: Option[Row] = qe.observedMetrics.get(name)
 val metrics: Option[Map[String, Any]] = row.map(r => 
r.getValuesMap[Any](r.schema.fieldNames.toImmutableArraySeq))
 if (setMetricsAndNotify(metrics)) {
   unregister()
 }
   }
   ```
   
   The option is for a case when the query finishes without metric. Is this 
possible?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]

2024-04-19 Thread via GitHub


uros-db commented on code in PR #46041:
URL: https://github.com/apache/spark/pull/46041#discussion_r1572231615


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala:
##
@@ -54,7 +54,7 @@ object CollationTypeCasts extends TypeCoercionRule {
 
 case otherExpr @ (
   _: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _: 
Greatest | _: Least |
-  _: Coalesce | _: BinaryExpression | _: ConcatWs) =>
+  _: Coalesce | _: BinaryExpression | _: ConcatWs | _: StringLPad | _: 
StringRPad) =>

Review Comment:
   unfortunately, CollationTypeCasts doesn't quite do what you would expect 
based on those descriptions - we're working on fixing that
   
   but if you were to run this:
   ```
 test("Support StringLPad string expressions with collation") {
   val query = "SELECT lpad('abc', collate('5', 'unicode_ci'), ' ')"
   checkAnswer(sql(query), Row("  abc"))
   assert(sql(query).schema.fields.head.dataType.sameType(StringType(3)))
 }
   ```
 you would find that resulting datatype is UNICODE_CI (instead of 
UTF8_BINARY which should be the correct type)
 
 the reason for this behaviour is found in the ordering of rules in 
`TypeCoercion` - here you'll find that for example `CollationTypeCasts` comes 
before `FunctionArgumentConversion`, so in the above example collation is taken 
from '5' before it's converted into a 5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]

2024-04-19 Thread via GitHub


uros-db commented on code in PR #46041:
URL: https://github.com/apache/spark/pull/46041#discussion_r1572232587


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala:
##
@@ -54,7 +54,7 @@ object CollationTypeCasts extends TypeCoercionRule {
 
 case otherExpr @ (
   _: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _: 
Greatest | _: Least |
-  _: Coalesce | _: BinaryExpression | _: ConcatWs) =>
+  _: Coalesce | _: BinaryExpression | _: ConcatWs | _: StringLPad | _: 
StringRPad) =>

Review Comment:
   so for now, we should only call `collateToSingleType` on correct parameters 
- in this case: first and third



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 [spark]

2024-04-19 Thread via GitHub


bjornjorgensen commented on PR #46137:
URL: https://github.com/apache/spark/pull/46137#issuecomment-2066277141

   
![image](https://github.com/apache/spark/assets/47577197/a166f540-9254-4614-af83-6848206e6643)
   
   @dongjoon-hyun FYI 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]

2024-04-19 Thread via GitHub


xupefei commented on code in PR #45701:
URL: https://github.com/apache/spark/pull/45701#discussion_r1572148735


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -813,6 +823,28 @@ class SparkSession private[sql] (
* Set to false to prevent client.releaseSession on close() (testing only)
*/
   private[sql] var releaseSessionOnClose = true
+
+  private[sql] def registerObservation(planId: Long, observation: 
Observation): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, as it is only allowed to do 
this once
+observation.synchronized {
+  if (observationRegistry.contains(planId)) {
+throw new IllegalArgumentException("An Observation can be used with a 
Dataset only once")
+  }
+  observationRegistry.put(planId, observation)
+}
+  }
+
+  private[sql] def setMetricsAndUnregisterObservation(
+  planId: Long,
+  metrics: Option[Map[String, Any]]): Unit = {
+observationRegistry.get(planId).map { observation =>
+  if (observation.setMetricsAndNotify(metrics)) {
+observationRegistry.remove(planId)

Review Comment:
   I had the same question when I looked at the code. In Spark Core we only 
de-register the Observation when some non-empty metrics are set. I am not sure 
under which circumstance the metrics can be empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]

2024-04-19 Thread via GitHub


xupefei commented on code in PR #45701:
URL: https://github.com/apache/spark/pull/45701#discussion_r1572148735


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -813,6 +823,28 @@ class SparkSession private[sql] (
* Set to false to prevent client.releaseSession on close() (testing only)
*/
   private[sql] var releaseSessionOnClose = true
+
+  private[sql] def registerObservation(planId: Long, observation: 
Observation): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, as it is only allowed to do 
this once
+observation.synchronized {
+  if (observationRegistry.contains(planId)) {
+throw new IllegalArgumentException("An Observation can be used with a 
Dataset only once")
+  }
+  observationRegistry.put(planId, observation)
+}
+  }
+
+  private[sql] def setMetricsAndUnregisterObservation(
+  planId: Long,
+  metrics: Option[Map[String, Any]]): Unit = {
+observationRegistry.get(planId).map { observation =>
+  if (observation.setMetricsAndNotify(metrics)) {
+observationRegistry.remove(planId)

Review Comment:
   I had the same question when I looked at the code. In Spark Core we only 
de-register the Observation when some non-empty metrics are set, so I decide to 
keep it the same in Connect. I am not sure under which circumstance the metrics 
can be empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47915][BUILD][K8S] Upgrade `kubernetes-client` to 6.12.1 [spark]

2024-04-19 Thread via GitHub


bjornjorgensen opened a new pull request, #46137:
URL: https://github.com/apache/spark/pull/46137

   ### What changes were proposed in this pull request?
   Upgrade `kubernetes-client` from 6.12.0 to 6.12.1
   
   ### Why are the changes needed?
   [Release 
notes](https://github.com/fabric8io/kubernetes-client/releases/tag/v6.12.1)
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Pass GA
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47545][CONNECT] Dataset `observe` support for the Scala client [spark]

2024-04-19 Thread via GitHub


xupefei commented on code in PR #45701:
URL: https://github.com/apache/spark/pull/45701#discussion_r1572146508


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -813,6 +823,28 @@ class SparkSession private[sql] (
* Set to false to prevent client.releaseSession on close() (testing only)
*/
   private[sql] var releaseSessionOnClose = true
+
+  private[sql] def registerObservation(planId: Long, observation: 
Observation): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, as it is only allowed to do 
this once
+observation.synchronized {

Review Comment:
   Agreed. I changed it to a concurrent hash map.



##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -813,6 +823,28 @@ class SparkSession private[sql] (
* Set to false to prevent client.releaseSession on close() (testing only)
*/
   private[sql] var releaseSessionOnClose = true
+
+  private[sql] def registerObservation(planId: Long, observation: 
Observation): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, as it is only allowed to do 
this once
+observation.synchronized {

Review Comment:
   Agreed. I changed it to a concurrent hashmap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47412][SQL] Add Collation Support for LPad/RPad. [spark]

2024-04-19 Thread via GitHub


GideonPotok commented on code in PR #46041:
URL: https://github.com/apache/spark/pull/46041#discussion_r1572135059


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala:
##
@@ -54,7 +54,7 @@ object CollationTypeCasts extends TypeCoercionRule {
 
 case otherExpr @ (
   _: In | _: InSubquery | _: CreateArray | _: ArrayJoin | _: Concat | _: 
Greatest | _: Least |
-  _: Coalesce | _: BinaryExpression | _: ConcatWs) =>
+  _: Coalesce | _: BinaryExpression | _: ConcatWs | _: StringLPad | _: 
StringRPad) =>

Review Comment:
   @uros-db  That is not so, in my understanding.
   
   `collateToSingleType` calls  `getOutputCollation`  and then does 
`exprs.map(e => castStringType(e, st).getOrElse(e))`
   
   `getOutputCollation`, according to the function comment blocks (verified by 
me looking at the code), "will only be affected by collated StringTypes or 
complex DataTypes with collated StringTypes (e.g. ArrayType)"
   
   
   `castStringType` , according to the function comment blocks (verified by me 
looking at the code), "casts given expression to collated StringType with id 
equal to collationId only if expression has StringType in the first place."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47906][PYTHON][DOCS] Fix docstring and type hint of `hll_union_agg` [spark]

2024-04-19 Thread via GitHub


zhengruifeng commented on PR #46128:
URL: https://github.com/apache/spark/pull/46128#issuecomment-2066182638

   thanks @HyukjinKwon 
   
   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47906][PYTHON][DOCS] Fix docstring and type hint of `hll_union_agg` [spark]

2024-04-19 Thread via GitHub


zhengruifeng closed pull request #46128: [SPARK-47906][PYTHON][DOCS] Fix 
docstring and type hint of `hll_union_agg`
URL: https://github.com/apache/spark/pull/46128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46632][SQL] Fix subexpression elimination when equivalent ternary expressions have different children [spark]

2024-04-19 Thread via GitHub


zml1206 commented on PR #46135:
URL: https://github.com/apache/spark/pull/46135#issuecomment-2066175301

   cc @peter-toth @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47914][SQL] Do not display the splits parameter in Rang [spark]

2024-04-19 Thread via GitHub


guixiaowen opened a new pull request, #46136:
URL: https://github.com/apache/spark/pull/46136

   
   
   ### What changes were proposed in this pull request?
   [SQL]
   
   explain extended select * from range(0, 4);
   
   Before this pr, the split is also displayed in the logical execution phase 
as None, if it is not be set. 
   `
   
   plan
   
   == Parsed Logical Plan ==
   
   'Project [*]
   
   +- 'UnresolvedTableValuedFunction [range], [0, 4]
   

   
   == Analyzed Logical Plan ==
   
   id: bigint
   
   Project 
[id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L)
   
   +- Range (0, 4, step=1, splits=None)
   

   
   == Optimized Logical Plan ==
   
   Range (0, 4, step=1, splits=None)
   

   
   == Physical Plan ==
   
   *(1) Range (0, 4, step=1, splits=1)`
   
   
   
   After this pr, the split will not be displayed in the logical execution 
phase , if it is not set. At the same time,  it will be  be displayed when it 
is be set.
   
   `
   
   plan
   
   == Parsed Logical Plan ==
   
   'Project [*]
   
   +- 'UnresolvedTableValuedFunction [range], [0, 4]
   

   
   == Analyzed Logical Plan ==
   
   id: bigint
   
   Project 
[id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L)
   
   +- Range (0, 4, step=1)
   

   
   == Optimized Logical Plan ==
   
   Range (0, 4, step=1)
   

   
   == Physical Plan ==
   
   *(1) Range (0, 4, step=1, splits=1)`
   
   
   
   ### Why are the changes needed?
   If the split is not be set.  it is also displayed in the logical execution 
phase as None, which is not very user-friendly.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47911][SQL] Introduces a universal BinaryFormatter to make binary output consistent [spark]

2024-04-19 Thread via GitHub


yaooqinn commented on code in PR #46133:
URL: https://github.com/apache/spark/pull/46133#discussion_r1572038458


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala:
##
@@ -414,3 +413,24 @@ trait ToStringBase { self: UnaryExpression with 
TimeZoneAwareExpression =>
  """.stripMargin
   }
 }
+
+object ToStringBase {
+  type BinaryFormatter = Array[Byte] => UTF8String
+
+  def getBinaryFormatter: BinaryFormatter = {
+val style = SQLConf.get.getConf(SQLConf.BINARY_OUTPUT_STYLE)
+BinaryOutputStyle.withName(style) match {
+  case BinaryOutputStyle.UTF8 =>
+array => UTF8String.fromBytes(array)
+  case BinaryOutputStyle.BASIC => array =>
+UTF8String.fromString(array.mkString("[", ", ", "]"))
+  case BinaryOutputStyle.BASE64 => array =>
+
UTF8String.fromString(java.util.Base64.getEncoder.withoutPadding().encodeToString(array))

Review Comment:
   
https://github.com/apache/hive/pull/1261/files#diff-107090e9a0306b19931b1b61f076c2a66afa7cfb1e49fea8624ef950ceb170d0R167
 As we have already support hive-service-rpc 4.0.0, I added this style too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Fix subexpression elimination when equivalent ternary expressions have different children [spark]

2024-04-19 Thread via GitHub


zml1206 opened a new pull request, #46135:
URL: https://github.com/apache/spark/pull/46135

   
   
   
   
   ### What changes were proposed in this pull request?
   Remove unexpected exception thrown in 
`EquivalentExpressions.updateExprInMap()`. Equivalent expressions may contain 
different children, it should happen expression not in map and useCount is -1.
   For example, before this PR will throw IllegalStateException
   ```
   Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
 .selectExpr("case when a + b + c>3 then 1 when c + a + b>0 then 2 else 
0 end as d").show()
   ```
   
   
   ### Why are the changes needed?
   Fix bug.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   New unit test, before this PR will throw IllegalStateException: *** with use 
count: -1
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] Move `src/test/java/test/*` to `src/test/java/*` [spark]

2024-04-19 Thread via GitHub


panbingkun opened a new pull request, #46134:
URL: https://github.com/apache/spark/pull/46134

   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic [spark]

2024-04-19 Thread via GitHub


HyukjinKwon commented on code in PR #46129:
URL: https://github.com/apache/spark/pull/46129#discussion_r1572006465


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -2306,7 +2183,7 @@ def _test() -> None:
 )
 
 (failure_count, test_count) = doctest.testmod(
-pyspark.sql.connect.dataframe,
+pyspark.sql.dataframe,

Review Comment:
   It does inherit the docstrings but `doctest` cannot. So I manually switch it 
to `pyspark.sql.dataframe` to run the doctest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DRAFT][SPARK-47414][SQL] Lowercase collation support for regexp expressions [spark]

2024-04-19 Thread via GitHub


uros-db commented on code in PR #46077:
URL: https://github.com/apache/spark/pull/46077#discussion_r1572003918


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollationExpressionSuite.scala:
##
@@ -161,4 +162,40 @@ class CollationExpressionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   checkEvaluation(ArrayExcept(left, right), out)
 }
   }
+
+  test("MultiLikeBase regexp expressions with collated strings") {

Review Comment:
   Agreed. Will definitely need to move this somewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >