Re: [PR] [SPARK-47858][PYTHON][FOLLOWUP] Excluding Python magic methods from error context target [spark]

2024-04-26 Thread via GitHub


HyukjinKwon closed pull request #46215: [SPARK-47858][PYTHON][FOLLOWUP] 
Excluding Python magic methods from error context target
URL: https://github.com/apache/spark/pull/46215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580640494


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -164,7 +178,20 @@ class PythonStreamingSourceRunner(
   val pickledPartition: Array[Byte] = PythonWorkerUtils.readBytes(dataIn)
   pickledPartitions.append(pickledPartition)
 }
-pickledPartitions.toArray
+val prefetchedRecordsStatus = dataIn.readInt()
+val iter: Option[Iterator[InternalRow]] = prefetchedRecordsStatus match {
+  case NON_EMPTY_PYARROW_RECORD_BATCHES => Some(readArrowRecordBatches())
+  case PREFETCHED_RECORDS_NOT_FOUND => None

Review Comment:
   What do we do if we hit this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Fix mathExpressions that use StringType [spark]

2024-04-26 Thread via GitHub


uros-db commented on PR #46227:
URL: https://github.com/apache/spark/pull/46227#issuecomment-2078899048

   I think @mihailom-db created it already 
https://issues.apache.org/jira/browse/SPARK-47408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48007][BUILD] MsSQLServer: upgrade mssql.jdbc.version to 12.6.1.jre11 [spark]

2024-04-26 Thread via GitHub


yaooqinn opened a new pull request, #46244:
URL: https://github.com/apache/spark/pull/46244

   
   
   ### What changes were proposed in this pull request?
   
   
   This PR upgrades mssql.jdbc.version to 12.6.1.jre11, 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc.
   
   
   ### Why are the changes needed?
   
   test dependency management
   
   ### Does this PR introduce _any_ user-facing change?
   
   no
   
   ### How was this patch tested?
   
   existing tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580604894


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Actually this is the hard part of implementing prefetcher for SS data 
source. When the query restarts, we assume that prefetcher would be able to 
start from known committed offset. Unfortunately that is not true. You've 
mentioned that this relies on getBatch trick but that's only applicable with 
DSv1 and it's clearly a hack to address some specific data source. That is not 
a contract streaming engine guarantees.
   
   We have an interface `AcceptsLatestSeenOffset` for this case (you need to 
adopt this on determining the start offset for prefetching), but this still 
does not give you the last committed offset but the latest seen offset, so 
Spark could still request the offset range before this offset. Though it would 
work if the simple data source reader can work with all 
planned-but-not-yet-committed offset range without relying on prefetcher. 
prefetcher can start prefetching with latest seen offset and previous offset 
range should be covered with planned 

Re: [PR] [SPARK-47355][SQL] Use wildcard imports in CollationTypeCasts [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46230:
URL: https://github.com/apache/spark/pull/46230#issuecomment-2078891802

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47994][SQL] Fix bug with CASE WHEN column filter push down in SQLServer [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46231:
URL: https://github.com/apache/spark/pull/46231#issuecomment-2078890912

   cc @beliefer FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Fix mathExpressions that use StringType [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46227:
URL: https://github.com/apache/spark/pull/46227#issuecomment-2078892507

   Can you create a JIRA, and link it to the PR title please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47355][SQL] Use wildcard imports in CollationTypeCasts [spark]

2024-04-26 Thread via GitHub


HyukjinKwon closed pull request #46230: [SPARK-47355][SQL] Use wildcard imports 
in CollationTypeCasts
URL: https://github.com/apache/spark/pull/46230


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-48003][SQL] Add collation support for hll sketch aggregate [spark]

2024-04-26 Thread via GitHub


uros-db opened a new pull request, #46241:
URL: https://github.com/apache/spark/pull/46241

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write [spark]

2024-04-26 Thread via GitHub


yaooqinn commented on PR #46240:
URL: https://github.com/apache/spark/pull/46240#issuecomment-2079123777

   Thanks, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write [spark]

2024-04-26 Thread via GitHub


yaooqinn closed pull request #46240: [SPARK-48004][SQL] Add WriteFilesExecBase 
trait for v1 write
URL: https://github.com/apache/spark/pull/46240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580604894


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Actually this is the hard part of implementing prefetcher for SS data 
source. When the query restarts, we assume that prefetcher would be able to 
start from known committed offset. Unfortunately that is not true. You've 
mentioned that this relies on getBatch trick but that's only applicable with 
DSv1 and it's clearly a hack to address some specific data source.
   
   We have an interface `AcceptsLatestSeenOffset` for this case (you need to 
adopt this on determining the start offset for prefetching), but this still 
does not give you the last committed offset but the latest seen offset, so 
Spark could still request the offset range before this offset. Though it would 
work if the simple data source reader can work with all 
planned-but-not-yet-committed offset range without relying on prefetcher. 
prefetcher can start prefetching with latest seen offset and previous offset 
range should be covered with planned batch(es).



-- 
This is an automated message from the 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580659454


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   OK, never mind. You are dealing with all the thing individually (not just 
leveraging DSv1 trick). Your comment seems a bit confusing - mentioning 
getBatch was the starting point I got confused.
   
   Still better to have fault-tolerance test(s) if we don't have it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47993][PYTHON] Drop Python 3.8 [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46228:
URL: https://github.com/apache/spark/pull/46228#issuecomment-2078883440

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47993][PYTHON] Drop Python 3.8 [spark]

2024-04-26 Thread via GitHub


HyukjinKwon closed pull request #46228: [SPARK-47993][PYTHON] Drop Python 3.8
URL: https://github.com/apache/spark/pull/46228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45225][SQL][FOLLOW-UP] XML: Fix nested XSD file path resolution [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46235:
URL: https://github.com/apache/spark/pull/46235#issuecomment-2078885747

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47858][PYTHON][FOLLOWUP] Excluding Python magic methods from error context target [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46215:
URL: https://github.com/apache/spark/pull/46215#issuecomment-2078831120

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47927][SQL]: Fix nullability attribute in UDF decoder [spark]

2024-04-26 Thread via GitHub


eejbyfeldt commented on PR #46156:
URL: https://github.com/apache/spark/pull/46156#issuecomment-2078895726

   @cloud-fan Since you reviewed the original PR, maybe you could have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-48003][SQL] Add collation support for hll sketch aggregate [spark]

2024-04-26 Thread via GitHub


09306677806 commented on PR #46241:
URL: https://github.com/apache/spark/pull/46241#issuecomment-2078978391

   bc1q2tk3c4z7zvjxhswrdpqt7xp98ea99h29zu9qyh
   
   From: Uros Bojanic ***@***.***>
   Sent: Friday, April 26, 2024 12:43:57 PM
   To: apache/spark ***@***.***>
   Cc: Subscribed ***@***.***>
   Subject: [apache/spark] [WIP][SPARK-48003][SQL] Add collation support for 
hll sketch aggregate (PR #46241)
   
   What changes were proposed in this pull request?
   Why are the changes needed?
   Does this PR introduce any user-facing change?
   How was this patch tested?
   Was this patch authored or co-authored using generative AI tooling?
   
   You can view, comment on, or merge this pull request online at:
   
 https://github.com/apache/spark/pull/46241
   
   Commit Summary
   
 *   
19985e7
 Initial commit
   
   File Changes
   
   (3 files)
   
 *   M 
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
 (13)
 *   M 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
 (8)
 *   M 
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
 (19)
   
   Patch Links:
   
 *   https://github.com/apache/spark/pull/46241.patch
 *   https://github.com/apache/spark/pull/46241.diff
   
   —
   Reply to this email directly, view it on 
GitHub, or 
unsubscribe.
   You are receiving this because you are subscribed to this thread.Message ID: 
***@***.***>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48005][PS][CONNECT][TESTS] Enable `DefaultIndexParityTests. test_index_distributed_sequence_cleanup` [spark]

2024-04-26 Thread via GitHub


zhengruifeng opened a new pull request, #46242:
URL: https://github.com/apache/spark/pull/46242

   ### What changes were proposed in this pull request?
   Enable `DefaultIndexParityTests. test_index_distributed_sequence_cleanup`
   
   
   ### Why are the changes needed?
   this test requires `sc` access, can be enabled in `Spark Connect with JVM` 
mode
   
   
   ### Does this PR introduce _any_ user-facing change?
   no, test only
   
   
   ### How was this patch tested?
   ci, also manually test:
   ```
   python/run-tests -k --python-executables python3 --testnames 
'pyspark.pandas.tests.connect.indexes.test_parity_default 
DefaultIndexParityTests.test_index_distributed_sequence_cleanup'
   Running PySpark tests. Output is in 
/Users/ruifeng.zheng/Dev/spark/python/unit-tests.log
   Will test against the following Python executables: ['python3']
   Will test the following Python tests: 
['pyspark.pandas.tests.connect.indexes.test_parity_default 
DefaultIndexParityTests.test_index_distributed_sequence_cleanup']
   python3 python_implementation is CPython
   python3 version is: Python 3.12.2
   Starting test(python3): 
pyspark.pandas.tests.connect.indexes.test_parity_default 
DefaultIndexParityTests.test_index_distributed_sequence_cleanup (temp output: 
/Users/ruifeng.zheng/Dev/spark/python/target/ccd3da45-f774-4f5f-8283-a91a8ee12212/python3__pyspark.pandas.tests.connect.indexes.test_parity_default_DefaultIndexParityTests.test_index_distributed_sequence_cleanup__p9yved3e.log)
   Finished test(python3): 
pyspark.pandas.tests.connect.indexes.test_parity_default 
DefaultIndexParityTests.test_index_distributed_sequence_cleanup (16s)
   Tests passed in 16 seconds
   ```
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580636552


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   The name itself might be OK. Maybe we have an option to make the both of 
method names be self-descriptive (not just read), but if we prefer shorter 
name, maybe OK to have either to be "read".
   
   I see a bigger issue on implementation. Let's address that first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580671684


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   Maybe we should at least put it in user-guide 
(https://spark.apache.org/docs/latest/api/python/user_guide/index.html). 
Usually we don't document developer API but I think this is important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47939][SQL] Implement a new Analyzer rule to move ParameterizedQuery inside ExplainCommand and DescribeQueryCommand [spark]

2024-04-26 Thread via GitHub


cloud-fan commented on code in PR #46209:
URL: https://github.com/apache/spark/pull/46209#discussion_r1580908359


##
sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala:
##
@@ -4715,6 +4715,145 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"), 
"outer")
 df5.crossJoin(df6)
   }
+
+  test("SPARK-47939: Describe should work with parameterized queries") {
+checkAnswer(
+  spark.sql("describe select ?", Array(1)),
+  Array(
+Row("1", "int", null)
+  )
+)
+checkAnswer(
+  spark.sql("describe select :first", Map("first" -> 1)),
+  Array(
+Row("1", "int", null)
+  )
+)
+
+checkAnswer(
+  spark.sql("describe select * from values (?, ?) t(x, y)", Array(1, "a")),
+  Array(
+Row("x", "int", null),
+Row("y", "string", null)
+  )
+)
+checkAnswer(
+  spark.sql(
+"describe select * from values (:first, :second) t(x, y)",
+Map("first" -> 1, "second" -> "a")
+  ),
+  Array(
+Row("x", "int", null),
+Row("y", "string", null)
+  )
+)
+  }
+
+  test("SPARK-47939: Explain should work with parameterized queries") {
+def checkQueryPlan(df: DataFrame, plan: String): Unit = assert(
+  df.collect()
+.map(_.getString(0))
+.map(_.replaceAll("#[0-9]+", "#N"))
+.sameElements(Array(plan))
+)
+
+checkQueryPlan(
+  spark.sql("explain select ?", Array(1)),
+  """== Physical Plan ==

Review Comment:
   nit: use `stripMargin`
   ```
   """
 |line1
 |line2
   """.stripMargin
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write [spark]

2024-04-26 Thread via GitHub


ulysses-you opened a new pull request, #46240:
URL: https://github.com/apache/spark/pull/46240

   
   
   ### What changes were proposed in this pull request?
   
   This pr adds a new trait `WriteFilesExecBase` for v1 write, so that the 
downstream project can inherit `WriteFilesExecBase` rather than 
`WriteFilesExec`. The reason is that, inherit a `case class` is a bad practice 
in scala world.
   
   ### Why are the changes needed?
   
   Make downstream project easy to develop.
   
   ### Does this PR introduce _any_ user-facing change?
   
   no
   
   ### How was this patch tested?
   
   Pass CI
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45225][SQL][FOLLOW-UP] XML: Fix nested XSD file path resolution [spark]

2024-04-26 Thread via GitHub


HyukjinKwon closed pull request #46235: [SPARK-45225][SQL][FOLLOW-UP] XML: Fix 
nested XSD file path resolution
URL: https://github.com/apache/spark/pull/46235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580659454


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   OK, never mind. You are dealing with all the thing individually (not just 
leveraging DSv1 trick). Your comment seems a bit confusing - mentioning 
getBatch was the starting point I got confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47440][SQL][FOLLOWUP] Reenable predicate pushdown for syntax with boolean comparison in MsSqlServer [spark]

2024-04-26 Thread via GitHub


yaooqinn commented on PR #46236:
URL: https://github.com/apache/spark/pull/46236#issuecomment-2078901765

   I guess we didn't find the proper way to both fix the syntax issue and 
retain the ability to pushdown at that time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48006][SQL]add SortOrder for window function which has no orde… [spark]

2024-04-26 Thread via GitHub


guixiaowen opened a new pull request, #46243:
URL: https://github.com/apache/spark/pull/46243

   
   
   ### What changes were proposed in this pull request?
   I am doing Hive SQL to switch to Spark SQL.
   

   
   In Hive SQL
   

   
   hive> explain select *,row_number() over (partition by day) rn from 
testdb.zeropart_db;
   
   OK
   Explain
   

   
   In Spark SQL
   
   spark-sql> explain select *,row_number() over (partition by age ) rn  from 
testdb.zeropart_db;
   
   plan
   
   == Physical Plan ==
   
   org.apache.spark.sql.AnalysisException: Window function row_number() 
requires window to be ordered, please add ORDER BY clause. For example SELECT 
row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY 
window_ordering) from table
   
   Time taken: 0.172 seconds, Fetched 1 row(s)
   

   
   For better compatibility with migration. For better compatibility with 
migration, new parameters are added to ensure compatibility with the same 
behavior as Hive SQL
   
   
   ### Why are the changes needed?
   For better compatibility with migration. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   before this pr:
   
   spark-sql> explain select *,row_number() over (partition by age ) rn  from 
testdb.zeropart_db;
   
   plan
   
   == Physical Plan ==
   
   org.apache.spark.sql.AnalysisException: Window function row_number() 
requires window to be ordered, please add ORDER BY clause. For example SELECT 
row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY 
window_ordering) from table
   
   Time taken: 0.172 seconds, Fetched 1 row(s)
   
   after this pr:
   
   spark-sql> explain select *,row_number() over (partition by age ) rn  from 
testdb.zeropart_db;
   plan
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Window [row_number() windowspecdefinition(age#37, age#37 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rn#30], [age#37], [age#37 ASC NULLS FIRST]
  +- Sort [age#37 ASC NULLS FIRST, age#37 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(age#37, 1000), ENSURE_REQUIREMENTS, 
[id=#53]
+- Scan hive testdb.zeropart_db [age#37, sex#38, name#39, day#40], 
HiveTableRelation [`bigdata_qa`.`zeropart_db`, 
org.apache.hadoop.hive.ql.io.orc.OrcSerde, Data Cols: [age#37, sex#38, 
name#39], Partition Cols: [day#40]]
   
   
   Time taken: 0.154 seconds, Fetched 1 row(s)
   
   
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47408][SQL] Fix mathExpressions that use StringType [spark]

2024-04-26 Thread via GitHub


cloud-fan commented on code in PR #46227:
URL: https://github.com/apache/spark/pull/46227#discussion_r1580903400


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.immutable.Seq
+
+import org.apache.spark.sql.internal.SqlApiConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StringType
+
+// scalastyle:off nonascii
+class CollationSQLExpressionsSuite
+  extends QueryTest
+with SharedSparkSession {
+
+  test("Conv expression with collation") {
+// Supported collations
+case class ConvTestCase(
+num: String,
+from_base: String,
+to_base: String,
+collationName: String,
+result: String)
+
+val testCases = Seq(
+  ConvTestCase("100", "2", "10", "UTF8_BINARY", "4"),
+  ConvTestCase("100", "2", "10", "UTF8_BINARY_LCASE", "4"),
+  ConvTestCase("100", "2", "10", "UNICODE", "4"),
+  ConvTestCase("100", "2", "10", "UNICODE_CI", "4")
+)
+testCases.foreach(t => {
+  val query =
+s"""
+   |select conv(collate('${t.num}', '${t.collationName}'), 
${t.from_base}, ${t.to_base})

Review Comment:
   why do we add end-to-end tests but not unit tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47409][SQL] Add support for collation for StringTrim type of functions/expressions [spark]

2024-04-26 Thread via GitHub


davidm-db commented on code in PR #46206:
URL: https://github.com/apache/spark/pull/46206#discussion_r1580662930


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java:
##
@@ -306,6 +308,258 @@ public static int execICU(final UTF8String string, final 
UTF8String substring,
 }
   }
 
+  public static class StringTrim {
+public static UTF8String exec(
+final UTF8String srcString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  if (collation.supportsBinaryEquality) {
+return execBinary(srcString);
+  } else if (collation.supportsLowercaseEquality) {
+return execLowercase(srcString);
+  } else {
+return execICU(srcString, collationId);
+  }
+}
+public static UTF8String exec(
+final UTF8String srcString,
+final UTF8String trimString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  if (collation.supportsBinaryEquality) {
+return execBinary(srcString, trimString);
+  } else if (collation.supportsLowercaseEquality) {
+return execLowercase(srcString, trimString);
+  } else {
+return execICU(srcString, trimString, collationId);
+  }
+}
+public static String genCode(
+final String srcString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  String expr = "CollationSupport.StringTrim.exec";
+  if (collation.supportsBinaryEquality) {
+return String.format(expr + "Binary(%s)", srcString);
+  } else if (collation.supportsLowercaseEquality) {
+return String.format(expr + "Lowercase(%s)", srcString);
+  } else {
+return String.format(expr + "ICU(%s, %d)", srcString, collationId);
+  }
+}
+public static String genCode(
+final String srcString,
+final String trimString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  String expr = "CollationSupport.StringTrim.exec";
+  if (collation.supportsBinaryEquality) {
+return String.format(expr + "Binary(%s, %s)", srcString, trimString);
+  } else if (collation.supportsLowercaseEquality) {
+return String.format(expr + "Lowercase(%s, %s)", srcString, 
trimString);
+  } else {
+return String.format(expr + "ICU(%s, %s, %d)", srcString, trimString, 
collationId);
+  }
+}
+public static UTF8String execBinary(
+final UTF8String srcString) {
+  return srcString.trim();
+}
+public static UTF8String execBinary(
+final UTF8String srcString,
+final UTF8String trimString) {
+  return srcString.trim(trimString);
+}
+public static UTF8String execLowercase(
+final UTF8String srcString) {
+  return srcString.trim();

Review Comment:
   Mihailo is right, this trims only spaces, which is the same behavior for 
lowercase collations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47440][SQL][FOLLOWUP] Reenable predicate pushdown for syntax with boolean comparison in MsSqlServer [spark]

2024-04-26 Thread via GitHub


HyukjinKwon commented on PR #46236:
URL: https://github.com/apache/spark/pull/46236#issuecomment-2078889259

   qq why did we disable it before?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write [spark]

2024-04-26 Thread via GitHub


ulysses-you commented on PR #46240:
URL: https://github.com/apache/spark/pull/46240#issuecomment-2078886099

   cc @cloud-fan @yaooqinn thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47989][SQL] MsSQLServer: Fix the scope of spark.sql.legacy.mssqlserver.numericMapping.enabled [spark]

2024-04-26 Thread via GitHub


yaooqinn commented on PR #46223:
URL: https://github.com/apache/spark/pull/46223#issuecomment-2078918918

   Thank you @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47914][SQL] Do not display the splits parameter in Range [spark]

2024-04-26 Thread via GitHub


guixiaowen commented on PR #46136:
URL: https://github.com/apache/spark/pull/46136#issuecomment-2079061228

   > > Mind checking the test failures?
   > 
   > ok. I will check it.
   
   @HyukjinKwon h hi Do you have any questions about this place?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48008][WIP] Support UDAFs in Spark Connect [spark]

2024-04-26 Thread via GitHub


xupefei opened a new pull request, #46245:
URL: https://github.com/apache/spark/pull/46245

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47968][SQL] MsSQLServer: Map datatimeoffset to TimestampType [spark]

2024-04-26 Thread via GitHub


yaooqinn opened a new pull request, #46239:
URL: https://github.com/apache/spark/pull/46239

   
   
   ### What changes were proposed in this pull request?
   
   
   This PR changes the `datatimeoffset -> StringType` mapping to 
`datatimeoffset -> TimestampType` mapping as we use `mssql-jdbc` for Microsoft 
SQL Server. `spark.sql.legacy.mssqlserver.datetimeoffsetMapping.enabled` is 
provided for user to restore the old behavior.
   
   ### Why are the changes needed?
   
   
   With the official SQL Server client, it's more reasonable to read it as 
TimestampType, which is also much more compliant with other jdbc datasources
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   Yes, (please refer to the first section)
   ### How was this patch tested?
   
   new tests 
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47440][SQL][FOLLOWUP] Reenable predicate pushdown for syntax with boolean comparison in MsSqlServer [spark]

2024-04-26 Thread via GitHub


yaooqinn opened a new pull request, #46236:
URL: https://github.com/apache/spark/pull/46236

   
   
   ### What changes were proposed in this pull request?
   
   
   In https://github.com/apache/spark/pull/45564, predicate pushdown with 
boolean comparison syntax in MsSqlServer is disabled as MsSqlServer does not 
support such a feature.
   
   In this PR, we reenable this feature by converting the boolean comparison to 
an equivalent 1&0 comparison.
   
   ### Why are the changes needed?
   
   
   Avoid performance regressions
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   no
   
   ### How was this patch tested?
   
   
   existing test
   
   ```
   [info] MsSqlServerIntegrationSuite:
   [info] - SPARK-47440: SQLServer does not support boolean expression in 
binary comparison (2 seconds, 206 milliseconds)
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48001][CORE] Remove unused `private implicit def arrayToArrayWritable` from `SparkContext` [spark]

2024-04-26 Thread via GitHub


LuciferYang opened a new pull request, #46238:
URL: https://github.com/apache/spark/pull/46238

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47986][CONNECT][PYTHON] Unable to create a new session when the default session is closed by the server [spark]

2024-04-26 Thread via GitHub


zhengruifeng closed pull request #46221: [SPARK-47986][CONNECT][PYTHON] Unable 
to create a new session when the default session is closed by the server
URL: https://github.com/apache/spark/pull/46221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47986][CONNECT][PYTHON] Unable to create a new session when the default session is closed by the server [spark]

2024-04-26 Thread via GitHub


zhengruifeng commented on PR #46221:
URL: https://github.com/apache/spark/pull/46221#issuecomment-2078787886

   thanks, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47409][SQL] Add support for collation for StringTrim type of functions/expressions [spark]

2024-04-26 Thread via GitHub


mihailom-db commented on code in PR #46206:
URL: https://github.com/apache/spark/pull/46206#discussion_r1580518571


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java:
##
@@ -306,6 +308,258 @@ public static int execICU(final UTF8String string, final 
UTF8String substring,
 }
   }
 
+  public static class StringTrim {
+public static UTF8String exec(
+final UTF8String srcString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  if (collation.supportsBinaryEquality) {
+return execBinary(srcString);
+  } else if (collation.supportsLowercaseEquality) {
+return execLowercase(srcString);
+  } else {
+return execICU(srcString, collationId);
+  }
+}
+public static UTF8String exec(
+final UTF8String srcString,
+final UTF8String trimString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  if (collation.supportsBinaryEquality) {
+return execBinary(srcString, trimString);
+  } else if (collation.supportsLowercaseEquality) {
+return execLowercase(srcString, trimString);
+  } else {
+return execICU(srcString, trimString, collationId);
+  }
+}
+public static String genCode(
+final String srcString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  String expr = "CollationSupport.StringTrim.exec";
+  if (collation.supportsBinaryEquality) {
+return String.format(expr + "Binary(%s)", srcString);
+  } else if (collation.supportsLowercaseEquality) {
+return String.format(expr + "Lowercase(%s)", srcString);
+  } else {
+return String.format(expr + "ICU(%s, %d)", srcString, collationId);
+  }
+}
+public static String genCode(
+final String srcString,
+final String trimString,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  String expr = "CollationSupport.StringTrim.exec";
+  if (collation.supportsBinaryEquality) {
+return String.format(expr + "Binary(%s, %s)", srcString, trimString);
+  } else if (collation.supportsLowercaseEquality) {
+return String.format(expr + "Lowercase(%s, %s)", srcString, 
trimString);
+  } else {
+return String.format(expr + "ICU(%s, %s, %d)", srcString, trimString, 
collationId);
+  }
+}
+public static UTF8String execBinary(
+final UTF8String srcString) {
+  return srcString.trim();
+}
+public static UTF8String execBinary(
+final UTF8String srcString,
+final UTF8String trimString) {
+  return srcString.trim(trimString);
+}
+public static UTF8String execLowercase(
+final UTF8String srcString) {
+  return srcString.trim();

Review Comment:
   Shouldn't this be for a space character trim? ' ' is the same for lowercase?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47409][SQL] Add support for collation for StringTrim type of functions/expressions [spark]

2024-04-26 Thread via GitHub


mihailom-db commented on code in PR #46206:
URL: https://github.com/apache/spark/pull/46206#discussion_r1580524717


##
common/unsafe/src/test/java/org/apache/spark/unsafe/types/CollationSupportSuite.java:
##
@@ -528,6 +528,235 @@ public void testFindInSet() throws SparkException {
 assertFindInSet("İo", "ab,i̇o,12", "UNICODE_CI", 2);
   }
 
+  private void assertStringTrim(
+  String collation,
+  String sourceString,
+  String trimString,
+  String expectedResultString) throws SparkException {
+int collationId = CollationFactory.collationNameToId(collation);
+String result;
+
+if (trimString == null) {
+  result = CollationSupport.StringTrim.exec(
+UTF8String.fromString(sourceString), collationId).toString();
+} else {
+  result = CollationSupport.StringTrim.exec(
+UTF8String.fromString(sourceString), 
UTF8String.fromString(trimString), collationId).toString();
+}
+
+assertEquals(expectedResultString, result);
+  }
+
+  private void assertStringTrimLeft(
+  String collation,
+  String sourceString,
+  String trimString,
+  String expectedResultString) throws SparkException {
+int collationId = CollationFactory.collationNameToId(collation);
+String result;
+
+if (trimString == null) {
+  result = CollationSupport.StringTrimLeft.exec(
+UTF8String.fromString(sourceString), collationId).toString();
+} else {
+  result = CollationSupport.StringTrimLeft.exec(
+UTF8String.fromString(sourceString), 
UTF8String.fromString(trimString), collationId).toString();
+}
+
+assertEquals(expectedResultString, result);
+  }
+
+  private void assertStringTrimRight(
+  String collation,
+  String sourceString,
+  String trimString,
+  String expectedResultString) throws SparkException {
+int collationId = CollationFactory.collationNameToId(collation);
+String result;
+
+if (trimString == null) {
+  result = CollationSupport.StringTrimRight.exec(
+UTF8String.fromString(sourceString), collationId).toString();
+} else {
+  result = CollationSupport.StringTrimRight.exec(
+UTF8String.fromString(sourceString), 
UTF8String.fromString(trimString), collationId).toString();
+}
+
+assertEquals(expectedResultString, result);
+  }
+
+  @Test
+  public void testStringTrim() throws SparkException {
+assertStringTrim("UTF8_BINARY", "asd", null, "asd");
+assertStringTrim("UTF8_BINARY", "  asd  ", null, "asd");
+assertStringTrim("UTF8_BINARY", " a世a ", null, "a世a");
+assertStringTrim("UTF8_BINARY", "asd", "x", "asd");
+assertStringTrim("UTF8_BINARY", "xxasdxx", "x", "asd");
+assertStringTrim("UTF8_BINARY", "xa世ax", "x", "a世a");
+
+assertStringTrimLeft("UTF8_BINARY", "asd", null, "asd");
+assertStringTrimLeft("UTF8_BINARY", "  asd  ", null, "asd  ");
+assertStringTrimLeft("UTF8_BINARY", " a世a ", null, "a世a ");
+assertStringTrimLeft("UTF8_BINARY", "asd", "x", "asd");
+assertStringTrimLeft("UTF8_BINARY", "xxasdxx", "x", "asdxx");
+assertStringTrimLeft("UTF8_BINARY", "xa世ax", "x", "a世ax");
+
+assertStringTrimRight("UTF8_BINARY", "asd", null, "asd");
+assertStringTrimRight("UTF8_BINARY", "  asd  ", null, "  asd");
+assertStringTrimRight("UTF8_BINARY", " a世a ", null, " a世a");
+assertStringTrimRight("UTF8_BINARY", "asd", "x", "asd");
+assertStringTrimRight("UTF8_BINARY", "xxasdxx", "x", "xxasd");
+assertStringTrimRight("UTF8_BINARY", "xa世ax", "x", "xa世a");
+
+assertStringTrim("UTF8_BINARY_LCASE", "asd", null, "asd");
+assertStringTrim("UTF8_BINARY_LCASE", "  asd  ", null, "asd");
+assertStringTrim("UTF8_BINARY_LCASE", " a世a ", null, "a世a");
+assertStringTrim("UTF8_BINARY_LCASE", "asd", "x", "asd");
+assertStringTrim("UTF8_BINARY_LCASE", "xxasdxx", "x", "asd");
+assertStringTrim("UTF8_BINARY_LCASE", "xa世ax", "x", "a世a");
+
+assertStringTrimLeft("UTF8_BINARY_LCASE", "asd", null, "asd");
+assertStringTrimLeft("UTF8_BINARY_LCASE", "  asd  ", null, "asd  ");
+assertStringTrimLeft("UTF8_BINARY_LCASE", " a世a ", null, "a世a ");
+assertStringTrimLeft("UTF8_BINARY_LCASE", "asd", "x", "asd");
+assertStringTrimLeft("UTF8_BINARY_LCASE", "xxasdxx", "x", "asdxx");
+assertStringTrimLeft("UTF8_BINARY_LCASE", "xa世ax", "x", "a世ax");
+
+assertStringTrimRight("UTF8_BINARY_LCASE", "asd", null, "asd");
+assertStringTrimRight("UTF8_BINARY_LCASE", "  asd  ", null, "  asd");
+assertStringTrimRight("UTF8_BINARY_LCASE", " a世a ", null, " a世a");
+assertStringTrimRight("UTF8_BINARY_LCASE", "asd", "x", "asd");
+assertStringTrimRight("UTF8_BINARY_LCASE", "xxasdxx", "x", "xxasd");
+assertStringTrimRight("UTF8_BINARY_LCASE", "xa世ax", "x", "xa世a");
+
+assertStringTrim("UTF8_BINARY_LCASE", "asd", null, "asd");
+assertStringTrim("UTF8_BINARY_LCASE", "  asd  ", null, "asd");
+assertStringTrim("UTF8_BINARY_LCASE", " a世a ", 

Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1580561802


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +494,103 @@ def stop(self) -> None:
 ...
 
 
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+Because  :class:`SimpleDataSourceStreamReader` read records in Spark 
driver node to determine
+end offset of each batch without partitioning, it is only supposed to be 
used in
+lightweight use cases where input rate and batch size is small.
+Use :class:`DataSourceStreamReader` when read throughput is high and can't 
be handled
+by a single process.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
+"""
+Read all available data from start offset and return the offset that 
next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A :class:`Tuple` of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end offset of this read attempt and the start of 
next read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
+"""
+Read all available data from specific start offset and end offset.
+This is invoked during failure recovery to re-read a batch 
deterministically
+in order to achieve exactly once.

Review Comment:
   For stateful workload, this is not only about exactly once but also about 
correctness. Maybe we could just stop from "deterministic batch execution" 
rather than further elaboration.



##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +494,103 @@ def stop(self) -> None:
 ...
 
 
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers.
+Compared to :class:`DataSourceStreamReader`, 
:class:`SimpleDataSourceStreamReader` doesn't
+require planning data partition. Also, the read api of 
:class:`SimpleDataSourceStreamReader`
+allows reading data and planning the latest offset at the same time.
+
+Because  :class:`SimpleDataSourceStreamReader` read records in Spark 
driver node to determine
+end offset of each batch without partitioning, it is only supposed to be 
used in
+lightweight use cases where input rate and batch size is small.
+Use :class:`DataSourceStreamReader` when read throughput is high and can't 
be handled
+by a single process.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) 

Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on PR #46233:
URL: https://github.com/apache/spark/pull/46233#issuecomment-2078732698

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47955][SQL] Improve `DeduplicateRelations` performance [spark]

2024-04-26 Thread via GitHub


beliefer commented on code in PR #46183:
URL: https://github.com/apache/spark/pull/46183#discussion_r1580583432


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##
@@ -38,28 +38,29 @@ case class RelationWrapper(cls: Class[_], outputAttrIds: 
Seq[Long])
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
 val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
-if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) {
-  // Wait for `ResolveMissingReferences` to resolve missing attributes 
first
-  return newPlan
-}
+
+def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty)
+
 newPlan.resolveOperatorsUpWithPruning(
   _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, 
UNION, COMMAND),
   ruleId) {
   case p: LogicalPlan if !p.childrenResolved => p
   // To resolve duplicate expression IDs for Join.
-  case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
+  case j @ Join(left, right, _, _, _) if !j.duplicateResolved && 
noMissingInput(right) =>
 j.copy(right = dedupRight(left, right))
   // Resolve duplicate output for LateralJoin.
-  case j @ LateralJoin(left, right, _, _) if right.resolved && 
!j.duplicateResolved =>
+  case j @ LateralJoin(left, right, _, _)
+  if right.resolved && !j.duplicateResolved && 
noMissingInput(right.plan) =>
 j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
   // Resolve duplicate output for AsOfJoin.
-  case j @ AsOfJoin(left, right, _, _, _, _, _) if !j.duplicateResolved =>
+  case j @ AsOfJoin(left, right, _, _, _, _, _)
+  if !j.duplicateResolved && noMissingInput(right) =>
 j.copy(right = dedupRight(left, right))
   // intersect/except will be rewritten to join at the beginning of 
optimizer. Here we need to
   // deduplicate the right side plan, so that we won't produce an invalid 
self-join later.
-  case i @ Intersect(left, right, _) if !i.duplicateResolved =>
+  case i @ Intersect(left, right, _) if !i.duplicateResolved && 
noMissingInput(right) =>
 i.copy(right = dedupRight(left, right))
-  case e @ Except(left, right, _) if !e.duplicateResolved =>
+  case e @ Except(left, right, _) if !e.duplicateResolved && 
noMissingInput(right) =>
 e.copy(right = dedupRight(left, right))
   // Only after we finish by-name resolution for Union
   case u: Union if !u.byName && !u.duplicateResolved =>

Review Comment:
   Got it. Thank you for your explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47922][SQL] Implement the try_parse_json expression [spark]

2024-04-26 Thread via GitHub


cloud-fan closed pull request #46141: [SPARK-47922][SQL] Implement the 
try_parse_json expression
URL: https://github.com/apache/spark/pull/46141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47922][SQL] Implement the try_parse_json expression [spark]

2024-04-26 Thread via GitHub


cloud-fan commented on PR #46141:
URL: https://github.com/apache/spark/pull/46141#issuecomment-2078722106

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999][SS] Improve logging around snapshot creation and adding/removing entries from state cache map in HDFS backed state store provider [spark]

2024-04-26 Thread via GitHub


HeartSaVioR closed pull request #46233: [SPARK-47999][SS] Improve logging 
around snapshot creation and adding/removing entries from state cache map in 
HDFS backed state store provider
URL: https://github.com/apache/spark/pull/46233


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48001][CORE] Remove unused `private implicit def arrayToArrayWritable` from `SparkContext` [spark]

2024-04-26 Thread via GitHub


yaooqinn closed pull request #46238: [SPARK-48001][CORE] Remove unused `private 
implicit def arrayToArrayWritable` from `SparkContext`
URL: https://github.com/apache/spark/pull/46238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-26 Thread via GitHub


anishshri-db commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580520020


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) {
+  Some(loadedMaps.lastKey())
+} else {
+  None
+}
+
+if (earliestLoadedVersion.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and 
earliest_loaded_version=${earliestLoadedVersion.get} " +

Review Comment:
   I think its ok to just know earliest ? we know current size anyway and 
versions are supposed to sequential here since we call this function on each 
commit ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48001][CORE] Remove unused `private implicit def arrayToArrayWritable` from `SparkContext` [spark]

2024-04-26 Thread via GitHub


LuciferYang commented on PR #46238:
URL: https://github.com/apache/spark/pull/46238#issuecomment-2078804299

   Thanks @yaooqinn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48001][CORE] Remove unused `private implicit def arrayToArrayWritable` from `SparkContext` [spark]

2024-04-26 Thread via GitHub


yaooqinn commented on PR #46238:
URL: https://github.com/apache/spark/pull/46238#issuecomment-2078803704

   Merged to master. Thank you @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47999] Improve logging around snapshot creation and adding/removing entries from state cache map [spark]

2024-04-26 Thread via GitHub


HeartSaVioR commented on code in PR #46233:
URL: https://github.com/apache/spark/pull/46233#discussion_r1580548906


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##
@@ -437,6 +437,23 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private def putStateIntoStateCacheMap(
   newVersion: Long,
   map: HDFSBackedStateStoreMap): Unit = synchronized {
+val loadedEntries = loadedMaps.size()
+val earliestLoadedVersion: Option[Long] = if (loadedEntries > 0) {
+  Some(loadedMaps.lastKey())
+} else {
+  None
+}
+
+if (earliestLoadedVersion.isDefined) {
+  logInfo(s"Trying to add version=$newVersion to state cache map with " +
+s"current_size=$loadedEntries and 
earliest_loaded_version=${earliestLoadedVersion.get} " +

Review Comment:
   Yeah probably could calculate back. Since we are not required to look into 
this message every time (probably only to investigate) so some effort on 
calculating back is probably OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark StreamingQueryListener [spark]

2024-04-26 Thread via GitHub


WweiL opened a new pull request, #46237:
URL: https://github.com/apache/spark/pull/46237

   
   
   ### What changes were proposed in this pull request?
   
   Following this doc test revisit PR 
https://github.com/apache/spark/pull/46189, for extra safety, add a unit test 
that verify observed metrics works for StreamingQueryListeners for both classic 
spark and spark connect.
   
   ### Why are the changes needed?
   
   Additional test coverage 
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Test only addition
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47955][SQL] Improve `DeduplicateRelations` performance [spark]

2024-04-26 Thread via GitHub


beliefer commented on PR #46183:
URL: https://github.com/apache/spark/pull/46183#issuecomment-2078791435

   Late LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47409][SQL] Add support for collation for StringTrim type of functions/expressions [spark]

2024-04-26 Thread via GitHub


mihailom-db commented on code in PR #46206:
URL: https://github.com/apache/spark/pull/46206#discussion_r1580521078


##
sql/core/src/test/scala/org/apache/spark/sql/CollationStringExpressionsSuite.scala:
##
@@ -608,6 +610,181 @@ class CollationStringExpressionsSuite
 assert(sql(query).schema.fields.head.dataType.sameType(StringType(0)))
   }
 
+  test("StringTrim* functions - unit tests for both paths (codegen and eval)") 
{
+// Without trimString param.
+checkEvaluation(StringTrim(Literal.create( "  asd  ", StringType(0))), 
"asd")
+checkEvaluation(StringTrimLeft(Literal.create("  asd  ", StringType(0))), 
"asd  ")
+checkEvaluation(StringTrimRight(Literal.create("  asd  ", StringType(0))), 
"  asd")
+
+// With trimString param.
+checkEvaluation(
+  StringTrim(Literal.create("  asd  ", StringType(0)), Literal.create(" ", 
StringType(0))),
+  "asd")
+checkEvaluation(
+  StringTrimLeft(Literal.create("  asd  ", StringType(0)), 
Literal.create(" ", StringType(0))),
+  "asd  ")
+checkEvaluation(
+  StringTrimRight(Literal.create("  asd  ", StringType(0)), 
Literal.create(" ", StringType(0))),
+  "  asd")
+
+checkEvaluation(
+  StringTrim(Literal.create("xxasdxx", StringType(0)), Literal.create("x", 
StringType(0))),
+  "asd")
+checkEvaluation(
+  StringTrimLeft(Literal.create("xxasdxx", StringType(0)), 
Literal.create("x", StringType(0))),
+  "asdxx")
+checkEvaluation(
+  StringTrimRight(Literal.create("xxasdxx", StringType(0)), 
Literal.create("x", StringType(0))),
+  "xxasd")
+  }

Review Comment:
   I am not sure we need this test that much, when we have E2E, but if we 
intend to keep it, please use StringType(collation_name) as it makes it more 
readable for anyone who wants to review the code. Also extracting that name as 
a val collationName = "UTF8_BINARY" could be good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581254368


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.status;
+
+import java.util.Set;
+
+public enum ApplicationStateSummary implements BaseStateSummary {
+  /** Spark application is submitted to the cluster but yet scheduled */
+  Submitted,
+
+  /** Spark application will be restarted with same configuration */
+  ScheduledToRestart,
+
+  /** A request has been made to start driver pod in the cluster */
+  DriverRequested,
+
+  /**
+   * Driver pod has reached 'Running' state and thus bound to a node Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverStarted,
+
+  /**
+   * Driver pod is ready to serve connections from executors Refer Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverReady,
+
+  /**
+   * Less that minimal required executor pods reached condition 'Ready' during 
starting up Note that
+   * reaching 'Ready' does not necessarily mean that the executor has 
successfully registered with
+   * driver. This is a best-effort from operator to detect executor status
+   */
+  InitializedBelowThresholdExecutors,
+
+  /**
+   * All required executor pods started reached condition 'Ready' Note that 
reaching 'Ready' does
+   * not necessarily mean that the executor has successfully registered with 
driver. This is a
+   * best-effort from operator to detect executor status
+   */
+  RunningHealthy,
+
+  /** The application has lost a fraction of executors for external reasons */
+  RunningWithBelowThresholdExecutors,
+
+  /** The request timed out for driver */
+  DriverStartTimedOut,
+
+  /** The request timed out for executors */
+  ExecutorsStartTimedOut,
+
+  /** Timed out waiting for driver to become ready */
+  DriverReadyTimedOut,
+
+  /**
+   * The application completed successfully, or System.exit() is called 
explicitly with zero state
+   */
+  Succeeded,
+
+  /**
+   * The application has failed, JVM exited abnormally, or System.exit is 
called explicitly with
+   * non-zero state
+   */
+  Failed,
+
+  /**
+   * Operator failed to orchestrate Spark application in cluster. For example, 
the given pod
+   * template is rejected by API server because it's invalid or does not meet 
cluster security
+   * standard; operator is not able to schedule pods due to insufficient quota 
or missing RBAC
+   */
+  SchedulingFailure,
+
+  /** The driver pod was failed with Evicted reason */
+  DriverEvicted,
+
+  /** all resources (pods, services .etc have been cleaned up) */
+  ResourceReleased,
+
+  /**
+   * If configured, operator may mark app as terminated without releasing 
resources. While this can
+   * be helpful in dev phase, it shall not be enabled for prod use cases
+   */
+  TerminatedWithoutReleaseResources;
+
+  public boolean isInitializing() {
+return Submitted.equals(this) || ScheduledToRestart.equals(this);
+  }
+
+  public boolean isStarting() {
+return ScheduledToRestart.ordinal() < this.ordinal()
+&& RunningHealthy.ordinal() > this.ordinal();
+  }
+
+  public boolean isTerminated() {
+return ResourceReleased.equals(this) || 
TerminatedWithoutReleaseResources.equals(this);

Review Comment:
   I'm wondering if `isTerminated` considers the following.
   - `ResourceRetainPolicy.Always`
   - `ResourceRetainPolicy.OnFailure`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581252300


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.status;
+
+import java.util.Set;
+
+public enum ApplicationStateSummary implements BaseStateSummary {
+  /** Spark application is submitted to the cluster but yet scheduled */
+  Submitted,
+
+  /** Spark application will be restarted with same configuration */
+  ScheduledToRestart,
+
+  /** A request has been made to start driver pod in the cluster */
+  DriverRequested,
+
+  /**
+   * Driver pod has reached 'Running' state and thus bound to a node Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverStarted,
+
+  /**
+   * Driver pod is ready to serve connections from executors Refer Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverReady,
+
+  /**
+   * Less that minimal required executor pods reached condition 'Ready' during 
starting up Note that
+   * reaching 'Ready' does not necessarily mean that the executor has 
successfully registered with
+   * driver. This is a best-effort from operator to detect executor status
+   */
+  InitializedBelowThresholdExecutors,
+
+  /**
+   * All required executor pods started reached condition 'Ready' Note that 
reaching 'Ready' does
+   * not necessarily mean that the executor has successfully registered with 
driver. This is a
+   * best-effort from operator to detect executor status
+   */
+  RunningHealthy,
+
+  /** The application has lost a fraction of executors for external reasons */
+  RunningWithBelowThresholdExecutors,
+
+  /** The request timed out for driver */
+  DriverStartTimedOut,
+
+  /** The request timed out for executors */
+  ExecutorsStartTimedOut,
+
+  /** Timed out waiting for driver to become ready */
+  DriverReadyTimedOut,
+
+  /**
+   * The application completed successfully, or System.exit() is called 
explicitly with zero state
+   */
+  Succeeded,
+
+  /**
+   * The application has failed, JVM exited abnormally, or System.exit is 
called explicitly with
+   * non-zero state
+   */
+  Failed,
+
+  /**
+   * Operator failed to orchestrate Spark application in cluster. For example, 
the given pod
+   * template is rejected by API server because it's invalid or does not meet 
cluster security
+   * standard; operator is not able to schedule pods due to insufficient quota 
or missing RBAC
+   */
+  SchedulingFailure,
+
+  /** The driver pod was failed with Evicted reason */
+  DriverEvicted,
+
+  /** all resources (pods, services .etc have been cleaned up) */
+  ResourceReleased,
+
+  /**
+   * If configured, operator may mark app as terminated without releasing 
resources. While this can
+   * be helpful in dev phase, it shall not be enabled for prod use cases
+   */
+  TerminatedWithoutReleaseResources;
+
+  public boolean isInitializing() {
+return Submitted.equals(this) || ScheduledToRestart.equals(this);
+  }
+
+  public boolean isStarting() {
+return ScheduledToRestart.ordinal() < this.ordinal()
+&& RunningHealthy.ordinal() > this.ordinal();
+  }
+
+  public boolean isTerminated() {

Review Comment:
   `DriverEvicted` is not a part of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581304652


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   Yes, we have tests where query get restarted multiple times and verify that 
replay microbatch succeeds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47050][SQL] Collect and publish partition level metrics for V1 [spark]

2024-04-26 Thread via GitHub


dbtsai commented on PR #46188:
URL: https://github.com/apache/spark/pull/46188#issuecomment-2079779162

   Gently pinging @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581319890


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):

Review Comment:
   How do we document it, do we add link to the python data source user guide 
in python user guide?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE] Make the external Spark ecosystem can use structured logging mechanisms [spark]

2024-04-26 Thread via GitHub


gengliangwang closed pull request #46193: [SPARK-47963][CORE] Make the external 
Spark ecosystem can use structured logging mechanisms
URL: https://github.com/apache/spark/pull/46193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


jiangzho commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581331728


##
spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/RestartPolicyTest.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import static org.apache.spark.k8s.operator.spec.RestartPolicy.OnFailure;
+import static 
org.apache.spark.k8s.operator.spec.RestartPolicy.OnInfrastructureFailure;
+import static 
org.apache.spark.k8s.operator.spec.RestartPolicy.attemptRestartOnState;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverEvicted;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverReadyTimedOut;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverStartTimedOut;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.ExecutorsStartTimedOut;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.Failed;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.Succeeded;

Review Comment:
   it's actually the Java style guide that discourages wildcard  - 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1580245635
 - spotless therefore marks it as violation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


jiangzho commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581331444


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.status;
+
+import java.util.Set;
+
+public enum ApplicationStateSummary implements BaseStateSummary {
+  /** Spark application is submitted to the cluster but yet scheduled */
+  Submitted,
+
+  /** Spark application will be restarted with same configuration */
+  ScheduledToRestart,
+
+  /** A request has been made to start driver pod in the cluster */
+  DriverRequested,
+
+  /**
+   * Driver pod has reached 'Running' state and thus bound to a node Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverStarted,
+
+  /**
+   * Driver pod is ready to serve connections from executors Refer Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverReady,
+
+  /**
+   * Less that minimal required executor pods reached condition 'Ready' during 
starting up Note that
+   * reaching 'Ready' does not necessarily mean that the executor has 
successfully registered with
+   * driver. This is a best-effort from operator to detect executor status
+   */
+  InitializedBelowThresholdExecutors,
+
+  /**
+   * All required executor pods started reached condition 'Ready' Note that 
reaching 'Ready' does
+   * not necessarily mean that the executor has successfully registered with 
driver. This is a
+   * best-effort from operator to detect executor status
+   */
+  RunningHealthy,
+
+  /** The application has lost a fraction of executors for external reasons */
+  RunningWithBelowThresholdExecutors,
+
+  /** The request timed out for driver */
+  DriverStartTimedOut,
+
+  /** The request timed out for executors */
+  ExecutorsStartTimedOut,
+
+  /** Timed out waiting for driver to become ready */
+  DriverReadyTimedOut,
+
+  /**
+   * The application completed successfully, or System.exit() is called 
explicitly with zero state
+   */
+  Succeeded,
+
+  /**
+   * The application has failed, JVM exited abnormally, or System.exit is 
called explicitly with
+   * non-zero state
+   */
+  Failed,
+
+  /**
+   * Operator failed to orchestrate Spark application in cluster. For example, 
the given pod
+   * template is rejected by API server because it's invalid or does not meet 
cluster security
+   * standard; operator is not able to schedule pods due to insufficient quota 
or missing RBAC
+   */
+  SchedulingFailure,
+
+  /** The driver pod was failed with Evicted reason */
+  DriverEvicted,
+
+  /** all resources (pods, services .etc have been cleaned up) */
+  ResourceReleased,
+
+  /**
+   * If configured, operator may mark app as terminated without releasing 
resources. While this can
+   * be helpful in dev phase, it shall not be enabled for prod use cases
+   */
+  TerminatedWithoutReleaseResources;
+
+  public boolean isInitializing() {
+return Submitted.equals(this) || ScheduledToRestart.equals(this);
+  }
+
+  public boolean isStarting() {
+return ScheduledToRestart.ordinal() < this.ordinal()
+&& RunningHealthy.ordinal() > this.ordinal();
+  }
+
+  public boolean isTerminated() {

Review Comment:
   It is not. it's 'stopping' as succeed / failed .etc



##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" 

Re: [PR] [SPARK-46350][SS] Fix state removal for stream-stream join with one watermark and one time-interval condition [spark]

2024-04-26 Thread via GitHub


rangadi commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1581366231


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
   attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
   condition,
   eventTimeWatermarkForEviction)
-val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-expr.map(JoinStateValueWatermarkPredicate.apply _)
 
+// For example, if the condition is of the form:
+//left_time > right_time + INTERVAL 30 MINUTES
+// Then this extracts left_time and right_time.
+val attributesInCondition = AttributeSet(
+  condition.get.collect { case a: AttributeReference => a }
+)
+
+// Construct an AttributeSet so that we can perform equality between 
attributes,
+// which we do in the filter below.
+val oneSideInputAttributeSet = AttributeSet(oneSideInputAttributes)
+
+// oneSideInputAttributes could be [left_value, left_time], and we just
+// want the attribute _in_ the time-interval condition.
+val oneSideStateWatermarkAttributes = attributesInCondition.filter { a 
=>
+oneSideInputAttributeSet.contains(a)

Review Comment:
   Can you give an example? 
   
   > Is this assured to be left_time mentioned in the comment?
   
   What about this part? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##
@@ -219,10 +222,41 @@ object StreamingSymmetricHashJoinHelper extends Logging {
   attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
   condition,
   eventTimeWatermarkForEviction)
-val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-expr.map(JoinStateValueWatermarkPredicate.apply _)
 
+// If the condition itself is empty (for example, left_time < 
left_time + INTERVAL ...),
+// then we will not have generated a stateValueWatermark.
+if (stateValueWatermark.isEmpty) {
+  None
+} else {
+  // For example, if the condition is of the form:
+  //left_time > right_time + INTERVAL 30 MINUTES
+  // Then this extracts left_time and right_time.

Review Comment:
   Is `condition` here only the time-interval part of the join condition? (e.g. 
consider 'A.id = B.id AND A.ts > B.ts + INTERNAL 30 MINUTES).



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##
@@ -219,10 +222,35 @@ object StreamingSymmetricHashJoinHelper extends Logging {
   attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
   condition,
   eventTimeWatermarkForEviction)
-val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))

Review Comment:
   > filtering for the 
   
   Did you mean filtering out? 
   
   > Effectively, this line is equivalent to 
oneSideStateWatermarkAttributes.head.
   
   `This line`: Is that the line you removed? 
   



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##
@@ -219,10 +222,41 @@ object StreamingSymmetricHashJoinHelper extends Logging {
   attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
   condition,
   eventTimeWatermarkForEviction)
-val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-expr.map(JoinStateValueWatermarkPredicate.apply _)
 
+// If the condition itself is empty (for example, left_time < 
left_time + INTERVAL ...),
+// then we will not have generated a stateValueWatermark.
+if (stateValueWatermark.isEmpty) {
+  None
+} else {
+  // For example, if the condition is of the form:
+  //left_time > right_time + INTERVAL 30 MINUTES
+  // Then this extracts left_time and right_time.

Review Comment:
   I.e. can there be an assert here that `attributesInCondition` is exactly two 
timestamps, one on each side? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##
@@ -219,10 +222,41 @@ object StreamingSymmetricHashJoinHelper extends Logging {
   attributesWithEventWatermark = 

Re: [PR] [WIP] Ensure that ForeachBatch can use libraries imported externally [spark]

2024-04-26 Thread via GitHub


ericm-db closed pull request #46191: [WIP] Ensure that ForeachBatch can use 
libraries imported externally
URL: https://github.com/apache/spark/pull/46191


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48010][SQL] Avoid repeated calls to conf.resolver in resolveExpression [spark]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on PR #46248:
URL: https://github.com/apache/spark/pull/46248#issuecomment-2079911328

   Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47986][CONNECT][PYTHON] Unable to create a new session when the default session is closed by the server [spark]

2024-04-26 Thread via GitHub


juliuszsompolski commented on PR #46221:
URL: https://github.com/apache/spark/pull/46221#issuecomment-2079934132

   > yes we most likely need the same for Scala.
   
   @nemanja-boric-databricks @nija-at would one of you have time to followup 
and check and fix for scala if needed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]

2024-04-26 Thread via GitHub


hvanhovell closed pull request #46098: [SPARK-47818][CONNECT][FOLLOW-UP] 
Introduce plan cache in SparkConnectPlanner to improve performance of Analyze 
requests
URL: https://github.com/apache/spark/pull/46098


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]

2024-04-26 Thread via GitHub


hvanhovell commented on PR #46098:
URL: https://github.com/apache/spark/pull/46098#issuecomment-2079778912

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581258317


##
spark-operator-api/src/test/java/org/apache/spark/k8s/operator/spec/RestartPolicyTest.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import static org.apache.spark.k8s.operator.spec.RestartPolicy.OnFailure;
+import static 
org.apache.spark.k8s.operator.spec.RestartPolicy.OnInfrastructureFailure;
+import static 
org.apache.spark.k8s.operator.spec.RestartPolicy.attemptRestartOnState;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverEvicted;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverReadyTimedOut;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverStartTimedOut;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.ExecutorsStartTimedOut;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.Failed;
+import static 
org.apache.spark.k8s.operator.status.ApplicationStateSummary.Succeeded;

Review Comment:
   I suggested to import `*`, but is this a result of one of `StaticAnalyzers`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581314644


##
python/pyspark/sql/datasource_internal.py:
##
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import json
+import copy
+from itertools import chain
+from typing import Iterator, List, Optional, Sequence, Tuple
+
+from pyspark.sql.datasource import (
+DataSource,
+DataSourceStreamReader,
+InputPartition,
+SimpleDataSourceStreamReader,
+)
+from pyspark.sql.types import StructType
+from pyspark.errors import PySparkNotImplementedError
+
+
+def _streamReader(datasource: DataSource, schema: StructType) -> 
"DataSourceStreamReader":
+"""
+Fallback to simpleStreamReader() method when streamReader() is not 
implemented.
+This should be invoked whenever a DataSourceStreamReader needs to be 
created instead of
+invoking datasource.streamReader() directly.
+"""
+try:
+return datasource.streamReader(schema=schema)
+except PySparkNotImplementedError:
+return 
_SimpleStreamReaderWrapper(datasource.simpleStreamReader(schema=schema))
+
+
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class PrefetchedCacheEntry:
+def __init__(self, start: dict, end: dict, iterator: Iterator[Tuple]):
+self.start = start
+self.end = end
+self.iterator = iterator
+
+
+class _SimpleStreamReaderWrapper(DataSourceStreamReader):
+"""
+A private class that wrap :class:`SimpleDataSourceStreamReader` in 
prefetch and cache pattern,
+so that :class:`SimpleDataSourceStreamReader` can integrate with streaming 
engine like an
+ordinary :class:`DataSourceStreamReader`.
+
+current_offset tracks the latest progress of the record prefetching, it is 
initialized to be
+initialOffset() when query start for the first time or initialized to be 
the end offset of
+the last committed batch when query restarts.
+
+When streaming engine calls latestOffset(), the wrapper calls read() that 
starts from
+current_offset, prefetches and cache the data, then updates the 
current_offset to be
+the end offset of the new data.
+
+When streaming engine call planInputPartitions(start, end), the wrapper 
get the prefetched data
+from cache and send it to JVM along with the input partitions.
+
+When query restart, batches in write ahead offset log that has not been 
committed will be
+replayed by reading data between start and end offset through read2(start, 
end).
+"""
+
+def __init__(self, simple_reader: SimpleDataSourceStreamReader):
+self.simple_reader = simple_reader
+self.initial_offset: Optional[dict] = None
+self.current_offset: Optional[dict] = None
+self.cache: List[PrefetchedCacheEntry] = []
+
+def initialOffset(self) -> dict:
+if self.initial_offset is None:
+self.initial_offset = self.simple_reader.initialOffset()
+return self.initial_offset
+
+def latestOffset(self) -> dict:
+# when query start for the first time, use initial offset as the start 
offset.

Review Comment:
   I realized the trick only works for V1 source and added the individual 
handling, let me also update the comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47986][CONNECT][PYTHON] Unable to create a new session when the default session is closed by the server [spark]

2024-04-26 Thread via GitHub


nemanja-boric-databricks commented on PR #46221:
URL: https://github.com/apache/spark/pull/46221#issuecomment-2079973373

   Yeah, I'll followup with the fix PR for scala client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [Draft] - Testing of Streaming and Collations [spark]

2024-04-26 Thread via GitHub


dbatomic opened a new pull request, #46247:
URL: https://github.com/apache/spark/pull/46247

   
   
   ### What changes were proposed in this pull request?
   
   Draft PR for Collation tests in Streaming.
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48007][BUILD] MsSQLServer: upgrade mssql.jdbc.version to 12.6.1.jre11 [spark]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #46244:
URL: https://github.com/apache/spark/pull/46244#discussion_r1581168210


##
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSQLServerDatabaseOnDocker.scala:
##
@@ -28,5 +28,6 @@ class MsSQLServerDatabaseOnDocker extends DatabaseOnDocker {
   override val jdbcPort: Int = 1433
 
   override def getJdbcUrl(ip: String, port: Int): String =
-s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;"
+s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;" +
+  "encrypt=true;trustServerCertificate=true"

Review Comment:
   This is enforced by default from now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47440][SQL][FOLLOWUP] Reenable predicate pushdown for syntax with boolean comparison in MsSqlServer [spark]

2024-04-26 Thread via GitHub


yaooqinn closed pull request #46236: [SPARK-47440][SQL][FOLLOWUP] Reenable 
predicate pushdown for syntax with boolean comparison in MsSqlServer
URL: https://github.com/apache/spark/pull/46236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581246394


##
build.gradle:
##
@@ -72,6 +72,8 @@ subprojects {
 '',
 'org.apache.spark',
   )
+  toggleOffOn()
+  targetExclude "**/BaseResource.java"

Review Comment:
   Oh. Got it. This is due to my request.. :( 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581256983


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStateSummary.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.status;
+
+import java.util.Set;
+
+public enum ApplicationStateSummary implements BaseStateSummary {
+  /** Spark application is submitted to the cluster but yet scheduled */
+  Submitted,
+
+  /** Spark application will be restarted with same configuration */
+  ScheduledToRestart,
+
+  /** A request has been made to start driver pod in the cluster */
+  DriverRequested,
+
+  /**
+   * Driver pod has reached 'Running' state and thus bound to a node Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverStarted,
+
+  /**
+   * Driver pod is ready to serve connections from executors Refer Refer
+   * https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+   */
+  DriverReady,
+
+  /**
+   * Less that minimal required executor pods reached condition 'Ready' during 
starting up Note that
+   * reaching 'Ready' does not necessarily mean that the executor has 
successfully registered with
+   * driver. This is a best-effort from operator to detect executor status
+   */
+  InitializedBelowThresholdExecutors,
+
+  /**
+   * All required executor pods started reached condition 'Ready' Note that 
reaching 'Ready' does
+   * not necessarily mean that the executor has successfully registered with 
driver. This is a
+   * best-effort from operator to detect executor status
+   */
+  RunningHealthy,
+
+  /** The application has lost a fraction of executors for external reasons */
+  RunningWithBelowThresholdExecutors,
+
+  /** The request timed out for driver */
+  DriverStartTimedOut,
+
+  /** The request timed out for executors */
+  ExecutorsStartTimedOut,
+
+  /** Timed out waiting for driver to become ready */
+  DriverReadyTimedOut,
+
+  /**
+   * The application completed successfully, or System.exit() is called 
explicitly with zero state
+   */
+  Succeeded,
+
+  /**
+   * The application has failed, JVM exited abnormally, or System.exit is 
called explicitly with
+   * non-zero state
+   */
+  Failed,
+
+  /**
+   * Operator failed to orchestrate Spark application in cluster. For example, 
the given pod
+   * template is rejected by API server because it's invalid or does not meet 
cluster security
+   * standard; operator is not able to schedule pods due to insufficient quota 
or missing RBAC
+   */
+  SchedulingFailure,
+
+  /** The driver pod was failed with Evicted reason */
+  DriverEvicted,
+
+  /** all resources (pods, services .etc have been cleaned up) */
+  ResourceReleased,
+
+  /**
+   * If configured, operator may mark app as terminated without releasing 
resources. While this can
+   * be helpful in dev phase, it shall not be enabled for prod use cases
+   */
+  TerminatedWithoutReleaseResources;
+
+  public boolean isInitializing() {
+return Submitted.equals(this) || ScheduledToRestart.equals(this);
+  }
+
+  public boolean isStarting() {
+return ScheduledToRestart.ordinal() < this.ordinal()
+&& RunningHealthy.ordinal() > this.ordinal();
+  }
+
+  public boolean isTerminated() {
+return ResourceReleased.equals(this) || 
TerminatedWithoutReleaseResources.equals(this);
+  }
+
+  public boolean isStopping() {
+return RunningWithBelowThresholdExecutors.ordinal() < this.ordinal() && 
!isTerminated();

Review Comment:
   For my understanding. this means `DriverStartTimedOut`, 
`ExecutorsStartTimedOut`, `DriverReadyTimedOut` are in `isStopping`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional 

Re: [PR] [SPARK-48005][PS][CONNECT][TESTS] Enable `DefaultIndexParityTests.test_index_distributed_sequence_cleanup` [spark]

2024-04-26 Thread via GitHub


dongjoon-hyun closed pull request #46242: [SPARK-48005][PS][CONNECT][TESTS] 
Enable `DefaultIndexParityTests.test_index_distributed_sequence_cleanup`
URL: https://github.com/apache/spark/pull/46242


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46122][SQL] Set `spark.sql.legacy.createHiveTableByDefault` to `false` by default [spark]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on PR #46207:
URL: https://github.com/apache/spark/pull/46207#issuecomment-2079759481

   I started a vote for this PR too.
   - https://lists.apache.org/thread/x09gynt90v3hh5sql1gt9dlcn6m6699p


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47050][SQL] Collect and publish partition level metrics for V1 [spark]

2024-04-26 Thread via GitHub


dbtsai commented on code in PR #46188:
URL: https://github.com/apache/spark/pull/46188#discussion_r1581313246


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala:
##
@@ -213,6 +260,14 @@ class BasicWriteJobStatsTracker(
   numFiles += summary.numFiles
   totalNumBytes += summary.numBytes
   totalNumOutput += summary.numRows
+
+  summary.partitionsStats.foreach(s => {

Review Comment:
   nit 
   
   ```
   summary.partitionsStats.foreach { s => 
// Check if we know the mapping of the internal row to a partition path
 if (partitionsMap.contains(s._1)) {
   val path = partitionsMap(s._1)
   partitionMetrics.update(path, s._2.numBytes, s._2.numRows, 
s._2.numFiles)
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE] Make the external Spark ecosystem can use structured logging mechanisms [spark]

2024-04-26 Thread via GitHub


gengliangwang commented on PR #46193:
URL: https://github.com/apache/spark/pull/46193#issuecomment-2079790129

   Thanks, merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48011][Core] Store LogKey name as a value to avoid generating new string instances [spark]

2024-04-26 Thread via GitHub


gengliangwang opened a new pull request, #46249:
URL: https://github.com/apache/spark/pull/46249

   
   
   ### What changes were proposed in this pull request?
   
   
   Store LogKey name as a value to avoid generating new string instances
   ### Why are the changes needed?
   
   To save memory usage on getting the names of `LogKey`s.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Existing tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48011][Core] Store LogKey name as a value to avoid generating new string instances [spark]

2024-04-26 Thread via GitHub


gengliangwang commented on code in PR #46249:
URL: https://github.com/apache/spark/pull/46249#discussion_r1581359491


##
common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala:
##
@@ -16,10 +16,14 @@
  */
 package org.apache.spark.internal
 
+import java.util.Locale
+
 /**
  * All structured logging `keys` used in `MDC` must be extends `LogKey`
  */
-trait LogKey
+trait LogKey {
+  val name: String = this.toString.toLowerCase(Locale.ROOT)

Review Comment:
   We can also make it `lazy val` here. I am not sure if the cost will be 
eventually higher. I suggest keeping it simple for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48011][Core] Store LogKey name as a value to avoid generating new string instances [spark]

2024-04-26 Thread via GitHub


gengliangwang commented on PR #46249:
URL: https://github.com/apache/spark/pull/46249#issuecomment-2079860587

   cc @panbingkun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581250016


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/InstanceConfig.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Config tolerations of executor instances for the application. Used then the 
target cluster is
+ * lack of batch / gang scheduling This is different from SparkConf: 
spark.executor.instances
+ *
+ * For example, with below spec: spec: applicationTolerations: 
instanceConfig: minExecutors: 3
+ * initExecutors: 5 maxExecutors: 10 sparkConf: spark.executor.instances: "10"
+ *
+ * Spark would try to bring up 10 executors as defined in SparkConf. In 
addition, from SparkApp
+ * perspective, + If Spark app acquires less than 5 executors in given tine 
window

Review Comment:
   Does this `+` exist for styling?



##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/InstanceConfig.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Config tolerations of executor instances for the application. Used then the 
target cluster is
+ * lack of batch / gang scheduling This is different from SparkConf: 
spark.executor.instances
+ *
+ * For example, with below spec: spec: applicationTolerations: 
instanceConfig: minExecutors: 3
+ * initExecutors: 5 maxExecutors: 10 sparkConf: spark.executor.instances: "10"
+ *
+ * Spark would try to bring up 10 executors as defined in SparkConf. In 
addition, from SparkApp
+ * perspective, + If Spark app acquires less than 5 executors in given tine 
window
+ * 
(.spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis)
 after
+ * submitted, it would be shut down proactively in order to avoid resource 
deadlock. + Spark app

Review Comment:
   ditto. `+`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47950] Add Java API Module for Spark Operator [spark-kubernetes-operator]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on code in PR #8:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/8#discussion_r1581250381


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/InstanceConfig.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Config tolerations of executor instances for the application. Used then the 
target cluster is
+ * lack of batch / gang scheduling This is different from SparkConf: 
spark.executor.instances
+ *
+ * For example, with below spec: spec: applicationTolerations: 
instanceConfig: minExecutors: 3
+ * initExecutors: 5 maxExecutors: 10 sparkConf: spark.executor.instances: "10"
+ *
+ * Spark would try to bring up 10 executors as defined in SparkConf. In 
addition, from SparkApp
+ * perspective, + If Spark app acquires less than 5 executors in given tine 
window
+ * 
(.spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis)
 after
+ * submitted, it would be shut down proactively in order to avoid resource 
deadlock. + Spark app
+ * would be marked as 'RUNNING_WITH_PARTIAL_CAPACITY' if it loses executors 
after successfully start
+ * up. + Spark app would be marked as 'RunningHealthy' if it has at least min 
executors after

Review Comment:
   ditto. `+`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48010][SQL] Avoid repeated calls to conf.resolver in resolveExpression [spark]

2024-04-26 Thread via GitHub


nikhilsheoran-db opened a new pull request, #46248:
URL: https://github.com/apache/spark/pull/46248

   ### What changes were proposed in this pull request?
   - This PR instead of calling `conf.resolver` for each call in 
`resolveExpression`, reuses the `resolver` obtained once.
   
   ### Why are the changes needed?
   - Consider a view with large number of columns (~1000s). When looking at the 
RuleExecutor metrics and flamegraph for a query that only does `DESCRIBE SELECT 
* FROM large_view`, observed that a large fraction of time is spent in 
`ResolveReferences` and `ResolveRelations`. Of these, the majority of the 
driver time went in initializing the `conf` to obtain `conf.resolver` for each 
of the column in the view.
   - Since, the same `conf` is used in each of these calls, calling the 
`conf.resolver` again and again can be avoided by initializing it once and 
reusing the same resolver.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   - Created a dummy view with a large number of columns.
   - Observed the `RuleExecutor` metrics using `RuleExecutor.dumpTimeSpent()`. 
Saw significant improvement here.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source [spark]

2024-04-26 Thread via GitHub


chaoqin-li1123 commented on code in PR #45977:
URL: https://github.com/apache/spark/pull/45977#discussion_r1581301878


##
python/pyspark/sql/datasource.py:
##
@@ -469,6 +501,188 @@ def stop(self) -> None:
 ...
 
 
+class SimpleInputPartition(InputPartition):
+def __init__(self, start: dict, end: dict):
+self.start = start
+self.end = end
+
+
+class SimpleDataSourceStreamReader(ABC):
+"""
+A base class for simplified streaming data source readers. Compared to 
DataSourceStreamReader,
+SimpleDataSourceStreamReader doesn't require planning data partitioning. 
Also, the read api of
+SimpleDataSourceStreamReader allows reading data and planning the latest 
offset at the same time.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def read(self, start: dict) -> (Iterator[Tuple], dict):
+"""
+Read all available data from specified start offset and return the 
offset that next read attempt
+starts from.
+
+Parameters
+--
+start : dict
+The start offset to start reading from.
+
+Returns
+---
+A tuple of an iterator of :class:`Tuple` and a dict\\s
+The iterator contains all the available records after start offset.
+The dict is the end of this read attempt and the start of next 
read attempt.
+"""
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "read"},
+)
+
+def read2(self, start: dict, end: dict) -> Iterator[Tuple]:

Review Comment:
   There can't be 2 methods named read() for the same class, python doesn't 
have method overloading IIRC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48010][SQL] Avoid repeated calls to conf.resolver in resolveExpression [spark]

2024-04-26 Thread via GitHub


dongjoon-hyun closed pull request #46248: [SPARK-48010][SQL] Avoid repeated 
calls to conf.resolver in resolveExpression
URL: https://github.com/apache/spark/pull/46248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47351][SQL] Add collation support for StringToMap & Mask string expressions [spark]

2024-04-26 Thread via GitHub


cloud-fan closed pull request #46165: [SPARK-47351][SQL] Add collation support 
for StringToMap & Mask string expressions
URL: https://github.com/apache/spark/pull/46165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47440][SQL][FOLLOWUP] Reenable predicate pushdown for syntax with boolean comparison in MsSqlServer [spark]

2024-04-26 Thread via GitHub


dongjoon-hyun commented on PR #46236:
URL: https://github.com/apache/spark/pull/46236#issuecomment-2079574626

   cc @stefanbuk-db , too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47476][SQL] Support REPLACE function to work with collated strings [spark]

2024-04-26 Thread via GitHub


cloud-fan commented on PR #45704:
URL: https://github.com/apache/spark/pull/45704#issuecomment-2079601554

   the Spark Connect test failure is flaky and unrelated here, I'm merging it 
to master, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47476][SQL] Support REPLACE function to work with collated strings [spark]

2024-04-26 Thread via GitHub


cloud-fan closed pull request #45704: [SPARK-47476][SQL] Support REPLACE 
function to work with collated strings
URL: https://github.com/apache/spark/pull/45704


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47968][SQL] MsSQLServer: Map datatimeoffset to TimestampType [spark]

2024-04-26 Thread via GitHub


yaooqinn commented on PR #46239:
URL: https://github.com/apache/spark/pull/46239#issuecomment-2079629526

   Irrelevant test failure in pyspark connect
   
   ```
   ERROR StatusConsoleListener An exception occurred processing Appender File
java.lang.IllegalArgumentException: found 1 argument placeholders, but 
provided 0 for pattern `0, VisitedIndex{visitedIndexes={}}: [] r:0`
at 
org.apache.logging.log4j.message.ParameterFormatter.formatMessage(ParameterFormatter.java:233)
at 
org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:266)
at 
org.apache.logging.log4j.core.pattern.MessagePatternConverter$SimpleMessagePatternConverter.format(MessagePatternConverter.java:120)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47350][SQL] Add collation support for SplitPart string expression [spark]

2024-04-26 Thread via GitHub


cloud-fan closed pull request #46158: [SPARK-47350][SQL] Add collation support 
for SplitPart string expression
URL: https://github.com/apache/spark/pull/46158


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47968][SQL] MsSQLServer: Map datatimeoffset to TimestampType [spark]

2024-04-26 Thread via GitHub


yaooqinn closed pull request #46239: [SPARK-47968][SQL] MsSQLServer: Map 
datatimeoffset to TimestampType
URL: https://github.com/apache/spark/pull/46239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47351][SQL] Add collation support for StringToMap & Mask string expressions [spark]

2024-04-26 Thread via GitHub


cloud-fan commented on PR #46165:
URL: https://github.com/apache/spark/pull/46165#issuecomment-2079297307

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [Only Test] [spark]

2024-04-26 Thread via GitHub


panbingkun opened a new pull request, #46246:
URL: https://github.com/apache/spark/pull/46246

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >