[PR] [MINOR][PYTHON][TESTS] Make `test_creation_index` deterministic [spark]

2024-04-23 Thread via GitHub


zhengruifeng opened a new pull request, #46200:
URL: https://github.com/apache/spark/pull/46200

   ### What changes were proposed in this pull request?
   Make `test_creation_index` deterministic
   
   
   ### Why are the changes needed?
   it may fail in some env
   ```
   FAIL [16.261s]: test_creation_index 
(pyspark.pandas.tests.frame.test_constructor.FrameConstructorTests.test_creation_index)
   --
   Traceback (most recent call last):
 File "/home/jenkins/python/pyspark/testing/pandasutils.py", line 91, in 
_assert_pandas_equal
   assert_frame_equal(
 File 
"/databricks/python3/lib/python3.11/site-packages/pandas/_testing/asserters.py",
 line 1257, in assert_frame_equal
   assert_index_equal(
 File 
"/databricks/python3/lib/python3.11/site-packages/pandas/_testing/asserters.py",
 line 407, in assert_index_equal
   raise_assert_detail(obj, msg, left, right)
 File 
"/databricks/python3/lib/python3.11/site-packages/pandas/_testing/asserters.py",
 line 665, in raise_assert_detail
   raise AssertionError(msg)
   AssertionError: DataFrame.index are different
   DataFrame.index values are different (40.0 %)
   [left]:  Int64Index([2, 3, 4, 6, 5], dtype='int64')
   [right]: Int64Index([2, 3, 4, 5, 6], dtype='int64')
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   no. test only
   
   
   ### How was this patch tested?
   ci
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47948][PYTHON] Upgrade the minimum `Pandas` version to 2.0.0 [spark]

2024-04-23 Thread via GitHub


itholic commented on code in PR #46175:
URL: https://github.com/apache/spark/pull/46175#discussion_r1577236035


##
dev/create-release/spark-rm/Dockerfile:
##
@@ -37,7 +37,7 @@ ENV DEBCONF_NONINTERACTIVE_SEEN true
 # These arguments are just for reuse and not really meant to be customized.
 ARG APT_INSTALL="apt-get install --no-install-recommends -y"
 
-ARG PIP_PKGS="sphinx==4.5.0 mkdocs==1.1.2 numpy==1.20.3 
pydata_sphinx_theme==0.13.3 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 
jinja2==3.1.2 twine==3.4.1 sphinx-plotly-directive==0.1.3 
sphinx-copybutton==0.5.2 pandas==1.5.3 pyarrow==10.0.1 plotly==5.4.0 
markupsafe==2.0.1 docutils<0.17 grpcio==1.62.0 protobuf==4.21.6 
grpcio-status==1.62.0 googleapis-common-protos==1.56.4"
+ARG PIP_PKGS="sphinx==4.5.0 mkdocs==1.1.2 numpy==1.20.3 
pydata_sphinx_theme==0.13.3 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 
jinja2==3.1.2 twine==3.4.1 sphinx-plotly-directive==0.1.3 
sphinx-copybutton==0.5.2 pandas==2.0.3 pyarrow==10.0.1 plotly==5.4.0 
markupsafe==2.0.1 docutils<0.17 grpcio==1.62.0 protobuf==4.21.6 
grpcio-status==1.62.0 googleapis-common-protos==1.56.4"

Review Comment:
   Yeah, at least from my local envs they all pass the test suites.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47914][SQL] Do not display the splits parameter in Rang [spark]

2024-04-23 Thread via GitHub


guixiaowen commented on PR #46136:
URL: https://github.com/apache/spark/pull/46136#issuecomment-2073986240

   > Mind checking the test failures?
   
   @HyukjinKwon  I made the changes. Do you have any other questions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47764][CORE][SQL] Cleanup shuffle dependencies based on ShuffleCleanupMode [spark]

2024-04-23 Thread via GitHub


bozhang2820 commented on code in PR #45930:
URL: https://github.com/apache/spark/pull/45930#discussion_r1577219567


##
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##
@@ -869,6 +874,8 @@ case class AdaptiveExecutionContext(session: SparkSession, 
qe: QueryExecution) {
*/
   val stageCache: TrieMap[SparkPlan, ExchangeQueryStageExec] =
 new TrieMap[SparkPlan, ExchangeQueryStageExec]()
+
+  val shuffleIds: TrieMap[Int, Boolean] = new TrieMap[Int, Boolean]()

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47953][DOCS] MsSQLServer: Document Mapping Spark SQL Data Types to Microsoft SQL Server [spark]

2024-04-23 Thread via GitHub


yaooqinn commented on PR #46177:
URL: https://github.com/apache/spark/pull/46177#issuecomment-2073972978

   Thank you @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS] Add `docs/_generated/` to .gitignore [spark]

2024-04-23 Thread via GitHub


yaooqinn commented on PR #46178:
URL: https://github.com/apache/spark/pull/46178#issuecomment-2073972776

   Thank you @dongjoon-hyun @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47949][SQL][DOCKER][TESTS] MsSQLServer: Bump up mssql docker image version to 2022-CU12-GDR1-ubuntu-22.04 [spark]

2024-04-23 Thread via GitHub


yaooqinn commented on PR #46176:
URL: https://github.com/apache/spark/pull/46176#issuecomment-2073972487

   Thank you @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


LuciferYang commented on code in PR #46193:
URL: https://github.com/apache/spark/pull/46193#discussion_r1577206238


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -144,6 +146,22 @@ trait LoggingSuiteBase
   }
   }
 
+  // An enumeration value that is not defined in `LogKey`, you can define it 
anywhere
+  private val externalLogKey = LogKey.VALUE
+  private val externalLog = log"${MDC(externalLogKey, "External log 
message.")}"
+  test("Logging with external LogKey") {

Review Comment:
   friendly ping @gengliangwang,  what's your take on this? Is 
`StructuredLogging` a feature that users can utilize? If so, how can we make it 
easier for users to extend the definition of `LogKey`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic [spark]

2024-04-23 Thread via GitHub


zhengruifeng commented on PR #46196:
URL: https://github.com/apache/spark/pull/46196#issuecomment-2073941487

   thanks @LuciferYang and @HyukjinKwon 
   
   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic [spark]

2024-04-23 Thread via GitHub


zhengruifeng closed pull request #46196: 
[SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests 
deterministic
URL: https://github.com/apache/spark/pull/46196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


panbingkun commented on PR #46193:
URL: https://github.com/apache/spark/pull/46193#issuecomment-2073933759

   Wait a moment, I'm checking


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


LuciferYang commented on code in PR #46193:
URL: https://github.com/apache/spark/pull/46193#discussion_r1577159936


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -144,6 +146,22 @@ trait LoggingSuiteBase
   }
   }
 
+  // An enumeration value that is not defined in `LogKey`, you can define it 
anywhere
+  private val externalLogKey = LogKey.VALUE
+  private val externalLog = log"${MDC(externalLogKey, "External log 
message.")}"
+  test("Logging with external LogKey") {

Review Comment:
   For this case, the key for `externalLog.context` will be always ~an empty 
string~ "value". Is this expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


LuciferYang commented on code in PR #46193:
URL: https://github.com/apache/spark/pull/46193#discussion_r1577167593


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -144,6 +146,22 @@ trait LoggingSuiteBase
   }
   }
 
+  // An enumeration value that is not defined in `LogKey`, you can define it 
anywhere
+  private val externalLogKey = LogKey.VALUE
+  private val externalLog = log"${MDC(externalLogKey, "External log 
message.")}"
+  test("Logging with external LogKey") {

Review Comment:
   In https://github.com/apache/spark/pull/46186/files, I try to relax the type 
of `MDC.key` to `Enumeration#Value`, which allows us to extend the use of other 
enumeration types as `MDC.key` to enhance a bit of extensibility.
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect [spark]

2024-04-23 Thread via GitHub


HyukjinKwon commented on code in PR #46194:
URL: https://github.com/apache/spark/pull/46194#discussion_r1577167281


##
python/pyspark/__init__.py:
##
@@ -125,8 +125,10 @@ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
 # for backward compatibility references.
 sys.modules["pyspark.context"] = context
 
-# for back compatibility
-from pyspark.sql import SQLContext, HiveContext, Row  # noqa: F401
+# for back compatibility
+from pyspark.sql import SQLContext, HiveContext  # noqa: F401
+
+from pyspark.sql import Row

Review Comment:
   ```suggestion
   from pyspark.sql import Row  # noqa: F401
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect [spark]

2024-04-23 Thread via GitHub


HyukjinKwon commented on code in PR #46194:
URL: https://github.com/apache/spark/pull/46194#discussion_r1577167281


##
python/pyspark/__init__.py:
##
@@ -125,8 +125,10 @@ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
 # for backward compatibility references.
 sys.modules["pyspark.context"] = context
 
-# for back compatibility
-from pyspark.sql import SQLContext, HiveContext, Row  # noqa: F401
+# for back compatibility
+from pyspark.sql import SQLContext, HiveContext  # noqa: F401
+
+from pyspark.sql import Row

Review Comment:
   ```suggestion
   from pyspark.sql import Row  # noqa: F401
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` [spark]

2024-04-23 Thread via GitHub


HyukjinKwon commented on PR #46198:
URL: https://github.com/apache/spark/pull/46198#issuecomment-2073905817

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` [spark]

2024-04-23 Thread via GitHub


HyukjinKwon closed pull request #46198: 
[SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at 
`_with_origin`
URL: https://github.com/apache/spark/pull/46198


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` [spark]

2024-04-23 Thread via GitHub


HyukjinKwon opened a new pull request, #46198:
URL: https://github.com/apache/spark/pull/46198

   ### What changes were proposed in this pull request?
   
   
   This PR is a followup of https://github.com/apache/spark/pull/46155 that 
adds check of `__name__` at `_with_origin`.
   
   ### Why are the changes needed?
   
   It is possible for a callable instance without __name__ attribute or/and 
__module__ attribute to be wrapped. For example, functools.partial.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   `./bin/pyspark`
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


LuciferYang commented on code in PR #46193:
URL: https://github.com/apache/spark/pull/46193#discussion_r1577159936


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -144,6 +146,22 @@ trait LoggingSuiteBase
   }
   }
 
+  // An enumeration value that is not defined in `LogKey`, you can define it 
anywhere
+  private val externalLogKey = LogKey.VALUE
+  private val externalLog = log"${MDC(externalLogKey, "External log 
message.")}"
+  test("Logging with external LogKey") {

Review Comment:
   For this case, the key for `externalLog.context` will be an empty string. Is 
this expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   that's a good point. 
   
   yes, any failure during resource creation will fail the job submission at 
the current stage, and I did see the ingress creation failure cases internally. 
we mitigate it by providing a switch to allow users to disable creating ingress 
for certain jobs by `spark.kubernetes.driver.ingress.enabled`.
   
   currently, we create resources in batches,
   
   
https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181
   
   if we want to make some resources optional, we need to add a new batch to 
handle them independently. meanwhile, a new method may need to add in 
`KubernetesFeatureConfigStep`, something like
   
   ```
   trait KubernetesFeatureConfigStep {
 ...
 def getAdditionalOptionalKubernetesResources(): Seq[HasMetadata] = 
Seq.empty
   }
   ```
   
   @dongjoon-hyun WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47965][CORE] Avoid orNull in TypedConfigBuilder [spark]

2024-04-23 Thread via GitHub


HyukjinKwon opened a new pull request, #46197:
URL: https://github.com/apache/spark/pull/46197

   ### What changes were proposed in this pull request?
   
   This PR proposes to avoid orNull in `TypedConfigBuilder`. Keys and values 
cannot be set `null` anyway, see `RuntimeConfig` and `SparkConf`.
   
   ### Why are the changes needed?
   
   For code cleanup.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   CI in this PR should verify them.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577138226


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   on the other hand, silently ignoring a resource that the user requested may 
confuse users. for example, if something goes wrong during the spark UI start, 
we fail the job immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577149932


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala:
##
@@ -108,5 +101,5 @@ private[spark] object DriverServiceFeatureStep {
   val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key
   val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key
   val DRIVER_SVC_POSTFIX = "-driver-svc"
-  val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH
+  val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH - 
"-ingress".length

Review Comment:
   make sense, we can generate the ingress name independently to avoid such 
regression



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   that's a good point. 
   
   yes, any failure during resource creation will fail the job submission at 
the current stage, and I did see the ingress creation failure cases internally. 
we mitigate it by providing a switch to allow users to disable creating ingress 
for certain jobs by `spark.kubernetes.driver.ingress.enabled`.
   
   currently, we create resources in batches,
   
   
https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181
   
   if we want to make some resources optional, we need to add a new batch to 
handle them independently. meanwhile, a new method may need to add in 
`KubernetesFeatureConfigStep`, something like
   
   ```
   trait KubernetesFeatureConfigStep {
 ...
 def getAdditionalOptionalKubernetesResources(): Seq[Option[HasMetadata]] = 
Seq.empty
   }
   ```
   
   @dongjoon-hyun WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   that's a good point. 
   
   yes, any failure during resource creation will fail the job submission at 
the current stage, and I did see the ingress creation failure cases internally. 
we mitigate it by providing a switch to allow users to disable creating ingress 
for certain jobs `spark.kubernetes.driver.ingress.enabled`.
   
   currently, we create resources in batches,
   
   
https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181
   
   if we want to make some resources optional, we need to add a new batch to 
handle them independently. meanwhile, a new method may need to add in 
`KubernetesFeatureConfigStep`, something like
   
   ```
   trait KubernetesFeatureConfigStep {
 ...
 def getAdditionalOptionalKubernetesResources(): Seq[Option[HasMetadata]] = 
Seq.empty
   }
   ```
   
   @dongjoon-hyun WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577138226


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   on the other hand, silent ignoring a resource that the user requested may 
confuse users. for example, if something goes wrong during the spark UI start, 
we fail the job immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577134736


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala:
##
@@ -87,16 +77,19 @@ private[spark] class DriverServiceFeatureStep(
   .withName(DRIVER_PORT_NAME)
   .withPort(driverPort)
   .withNewTargetPort(driverPort)
+  .withProtocol("TCP")

Review Comment:
   OK, thanks for advising, will remove it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   that's a good point. 
   
   yes, any failure during resource creation will cause the job submission 
failure at the current stage, and I did see the ingress creation failure cases 
internally. we mitigate it by providing a switch to allow users to disable 
creating ingress for certain jobs `spark.kubernetes.driver.ingress.enabled`.
   
   currently, we create resources in batches,
   
   
https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181
   
   if we want to make some resources optional, we need to add a new batch to 
handle them independently. meanwhile, a new method may need to add in 
`KubernetesFeatureConfigStep`, something like
   
   ```
   trait KubernetesFeatureConfigStep {
 ...
 def getAdditionalOptionalKubernetesResources(): Seq[Option[HasMetadata]] = 
Seq.empty
   }
   ```
   
   @dongjoon-hyun WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1577106837


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, 
IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI._
+
+class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep with Logging {
+
+  private lazy val driverServiceName: String = kubernetesConf.driverServiceName
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+if (!kubernetesConf.get(UI_ENABLED) || 
!kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) {
+  logInfo(
+s"Skip building ingress for Spark UI, due to" +
+  s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.")
+  return Seq.empty
+}
+
+val appId = kubernetesConf.appId
+val uiPort = kubernetesConf.get(UI_PORT)
+val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) 
match {
+  case Some(ingressHostPattern) =>
+ingressHostPattern.replace("{{APP_ID}}", appId)
+  case None =>
+logWarning(s"Skip building ingress for Spark UI, due to " +
+  s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.")
+return Seq.empty
+}
+
+val customLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  kubernetesConf.sparkConf,
+  KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX)
+val labels = customLabels ++ Map(SPARK_APP_ID_LABEL -> appId)
+
+val annotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+  kubernetesConf.sparkConf,
+  KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX)
+
+val path = new HTTPIngressPathBuilder()
+  .withPath(kubernetesConf.get(KUBERNETES_INGRESS_PATH))
+  .withPathType(kubernetesConf.get(KUBERNETES_INGRESS_PATH_TYPE))
+  .withNewBackend()
+.withNewService()
+  .withName(driverServiceName)
+  .withPort(new ServiceBackendPortBuilder().withNumber(uiPort).build())
+.endService()
+  .endBackend()
+  .build()
+
+val uiRule = new IngressRuleBuilder()
+  .withHost(ingressHost)
+  .withNewHttp()
+.addToPaths(path)
+  .endHttp()
+  .build()
+
+val ingress = new IngressBuilder()

Review Comment:
   yes, `OwnerReference` is handled at
   
https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L175
   
   I checked that deleting the driver pod removes the ingress too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic [spark]

2024-04-23 Thread via GitHub


zhengruifeng opened a new pull request, #46196:
URL: https://github.com/apache/spark/pull/46196

   ### What changes were proposed in this pull request?
   Make `max_by, min_by` doctests deterministic
   
   
   ### Why are the changes needed?
   https://github.com/apache/spark/pull/45939 fixed this issue by sorting the 
rows,
   unfortunately, it is not enough:
   
   in group `department=Finance`, two rows `("Finance", "Frank", 5)` and 
`("Finance", "George", 5)` have the same value `years_in_dept=5`, so 
`min_by("name", "years_in_dept")` and `max_by("name", "years_in_dept")` is 
still non-deterministic. 
   
   This test failed in some env:
   ```
   **
   File "/home/jenkins/python/pyspark/sql/connect/functions/builtin.py", line 
1177, in pyspark.sql.connect.functions.builtin.max_by
   Failed example:
   df.groupby("department").agg(
   sf.max_by("name", "years_in_dept")
   ).sort("department").show()
   Expected:
   +--+---+
   |department|max_by(name, years_in_dept)|
   +--+---+
   |   Consult|  Henry|
   |   Finance| George|
   +--+---+
   Got:
   +--+---+
   |department|max_by(name, years_in_dept)|
   +--+---+
   |   Consult|  Henry|
   |   Finance|  Frank|
   +--+---+
   
   **
   File "/home/jenkins/python/pyspark/sql/connect/functions/builtin.py", line 
1205, in pyspark.sql.connect.functions.builtin.min_by
   Failed example:
   df.groupby("department").agg(
   sf.min_by("name", "years_in_dept")
   ).sort("department").show()
   Expected:
   +--+---+
   |department|min_by(name, years_in_dept)|
   +--+---+
   |   Consult|Eva|
   |   Finance| George|
   +--+---+
   Got:
   +--+---+
   |department|min_by(name, years_in_dept)|
   +--+---+
   |   Consult|Eva|
   |   Finance|  Frank|
   +--+---+
   
   **
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   
   ### How was this patch tested?
   ci
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming [spark]

2024-04-23 Thread via GitHub


pkotikalapudi commented on code in PR #42352:
URL: https://github.com/apache/spark/pull/42352#discussion_r1577078911


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -340,6 +385,45 @@ private[spark] class ExecutorAllocationManager(
 }
   }
 
+  /**
+   * Maximum number of executors to be removed per dra evaluation.
+   *
+   * This function limits the number of idle executors to be removed if the
+   * `streamingDRAFeatureEnabled` flag is enabled, otherwise it returns all 
the idle executors.
+   * The number of executors to be removed is based on
+   * the de-allocation ratio (`executorDeallocationRatio`) and timeout 
(`removeTime`).
+   * It helps in removing only few executors per evaluation cycle and helps in 
gradual removal of
+   * executors across micro-batches.
+   */
+  private def maxExecutorDeallocationsPerEvaluation(
+ timedOutExecs : 
Seq[(String, Int)]
+   ): Seq[(String, Int)] = {
+if (!streamingDRAFeatureEnabled) {
+  timedOutExecs
+}
+else {
+  val currentTime = clock.nanoTime()
+
+  if (removeTime == NOT_SET) {

Review Comment:
   Good catch. I will update it to `Seq.empty[(String, Int)]`. Every evaluation 
happens at a fixed rate of few hundred milliseconds. So all those executors 
will be evaluated in the next round



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect [spark]

2024-04-23 Thread via GitHub


HyukjinKwon opened a new pull request, #46194:
URL: https://github.com/apache/spark/pull/46194

   ### What changes were proposed in this pull request?
   
   This PR hides SQLContext and HiveContext in `pyspark-connect`. They do not 
exist there.
   
   ### Why are the changes needed?
   
   To recover https://github.com/apache/spark/actions/runs/8805830494
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, the main change has not been released yet.
   
   ### How was this patch tested?
   
   Manually.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used [spark]

2024-04-23 Thread via GitHub


HyukjinKwon commented on PR #43087:
URL: https://github.com/apache/spark/pull/43087#issuecomment-2073737492

   I am going to revert this for now - I am cleaning up and refactoring 
workers. I will bring this back in again later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming [spark]

2024-04-23 Thread via GitHub


pkotikalapudi commented on code in PR #42352:
URL: https://github.com/apache/spark/pull/42352#discussion_r1577058026


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -340,6 +385,45 @@ private[spark] class ExecutorAllocationManager(
 }
   }
 
+  /**
+   * Maximum number of executors to be removed per dra evaluation.

Review Comment:
   It can be useful for batch jobs as well. Like allocation ratio, 
de-allocation ratio are also relevant in scaling-out/back only few executors at 
a time. Especially in cases where Stages of a batch job  has alternating 
high/low tasks.
   
   I restricted this behind `streamingDRAFeatureEnabled` initially for feature 
distinction. I can modify the code to have this as a config which can be used 
in traditional DRA as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS] Fix type hint of 3 functions [spark]

2024-04-23 Thread via GitHub


zhengruifeng commented on PR #46179:
URL: https://github.com/apache/spark/pull/46179#issuecomment-2073713053

   thank you @dongjoon-hyun for reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]

2024-04-23 Thread via GitHub


ueshin commented on code in PR #46140:
URL: https://github.com/apache/spark/pull/46140#discussion_r1577043660


##
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala:
##
@@ -37,7 +37,7 @@ class SparkConnectServerListenerSuite
 
   private var kvstore: ElementTrackingStore = _
 
-  private val jobTag = ExecuteJobTag("sessionId", "userId", "operationId")
+  private val jobTag = ExecuteJobTag("userId", "sessionId", "operationId")

Review Comment:
   The original fix is 
https://github.com/apache/spark/commit/dd6636b4e076c1c7fd3569ad509d28524a98d156.
   I'm fine with it too, but I asked 
https://github.com/apache/spark/pull/46140#issuecomment-2073342960 because I 
worried the inconsistency with the similar API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47764][CORE][SQL] Cleanup shuffle dependencies based on ShuffleCleanupMode [spark]

2024-04-23 Thread via GitHub


bozhang2820 commented on code in PR #45930:
URL: https://github.com/apache/spark/pull/45930#discussion_r1575960906


##
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala:
##
@@ -187,6 +187,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
 shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId)
   }
 }
+shuffleBlockResolver.removeShuffleToSkip(shuffleId)

Review Comment:
   Yeah this is a bit weird... Changed to use a Guava cache with a fixed 
maximum size (1000) instead, so that we do not need to do cleanups for 
shufflesToSkip. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


panbingkun commented on PR #46193:
URL: https://github.com/apache/spark/pull/46193#issuecomment-2073684346

   cc @gengliangwang @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]

2024-04-23 Thread via GitHub


panbingkun opened a new pull request, #46193:
URL: https://github.com/apache/spark/pull/46193

   ### What changes were proposed in this pull request?
   The pr aims to add `an external LogKey` usage case in UT.
   
   
   ### Why are the changes needed?
   Give an example to developers who use the LogKey mechanism in external 
system.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   - Manually test.
   - Pass GA.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


HyukjinKwon commented on PR #46125:
URL: https://github.com/apache/spark/pull/46125#issuecomment-2073665401

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


HyukjinKwon closed pull request #46125: [SPARK-47941] [SS] [Connect] Propagate 
ForeachBatch worker initialization errors to users for PySpark
URL: https://github.com/apache/spark/pull/46125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming [spark]

2024-04-23 Thread via GitHub


pkotikalapudi commented on code in PR #42352:
URL: https://github.com/apache/spark/pull/42352#discussion_r1577025511


##
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##
@@ -96,6 +96,30 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  * If an executor with caching data blocks has been idle for more than 
this duration,
  * the executor will be removed
  *
+ * Dynamic resource allocation is also extended to work for structured 
streaming use case.
+ * (micro-batch paradigm).
+ * For it to work we would still need the above configs + few additional 
configs.
+ *
+ * For executor allocation, In traditional DRA target number of executors are 
added based on the

Review Comment:
   From my understanding in batch queries each stage can  have varied resource 
requirements depending upon what it does. So DRA has `schedulerBacklogTimeout` 
to figure out when it should ask for more resources ([more on 
it](schedulerBacklogTimeout)). So the 
[pendingTasks](https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289)
 are determined by [the pending tasks of current 
stage](https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L883).
 I have [modified 
it](https://github.com/apache/spark/pull/42352/files#diff-fdddb0421641035be18233c212f0e3ccd2d6a49d345bd0cd4eac08fc4d911e21R1003)
 to consider the pending tasks of other stages as well because structured 
streaming deals with micro-batches and we want to scale out if the there are 
still other stages pending in the same micro-batch.
   
   for eg:
   
   with current DRA code, if config 
`spark.dynamicAllocation.schedulerBacklogTimeout` is set to 6 seconds and we 
use that for structured streaming job where a micro-batch consists of 4 stages 
which will run at max for 5 seconds each. 
   
   Then it wouldn't scale out even if 20 seconds pass because it is just 
5+5+5+5 = 30seconds.
   
   But the above mentioned changes I have done, while running the second stage 
on the 6th second it figures out that other stages in the micro-batch are 
pending so it scale-out appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1577018044


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala:
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.sql.Timestamp
+import java.time.{Instant, LocalDateTime, ZoneId}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.ExtendedAnalysisException
+import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.functions.window
+import org.apache.spark.sql.internal.SQLConf
+
+class ColumnRenamedStatefulProcessor
+  extends StatefulProcessor[String, InputEventRow, RenamedInputEventRow]
+  with Logging {
+
+  override def init(outputMode: OutputMode): Unit = { }

Review Comment:
   I dont think we need specifically state variables, as we only deal with the 
event time of output generated. Let me know if you think otherwise. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1577017638


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala:
##
@@ -107,25 +109,67 @@ case class EventTimeWatermarkExec(
   }
 
   // Update the metadata on the eventTime column to include the desired delay.
-  override val output: Seq[Attribute] = child.output.map { a =>
-if (a semanticEquals eventTime) {
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.putLong(EventTimeWatermark.delayKey, delayMs)
-.build()
-  a.withMetadata(updatedMetadata)
-} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
-  // Remove existing watermark
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.remove(EventTimeWatermark.delayKey)
-.build()
-  a.withMetadata(updatedMetadata)
-} else {
-  a
-}
+  override val output: Seq[Attribute] = {
+val delayMs = EventTimeWatermark.getDelayMs(delay)
+updateEventTimeColumn(child.output, delayMs, eventTime)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
EventTimeWatermarkExec =
 copy(child = newChild)
 }
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ *
+ * This node also ensures that output emitted by the child node adheres
+ * to watermark. If the child node emits rows which are older than global
+ * watermark, the node will throw an query execution error and fail the user
+ * query.
+ */
+case class UpdateEventTimeColumnExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+eventTimeWatermarkForEviction: Option[Long],
+child: SparkPlan) extends UnaryExecNode with Logging {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+child.execute().mapPartitions[InternalRow](
+  dataIterator => {
+val watermarkExpression = WatermarkSupport.watermarkExpression(
+  Some(eventTime), eventTimeWatermarkForEviction)
+
+if (watermarkExpression.isEmpty) {
+  // watermark should always be defined in this node.
+  throw QueryExecutionErrors.cannotGetEventTimeWatermarkError()
+}
+
+val predicate = Predicate.create(watermarkExpression.get, child.output)
+new Iterator[InternalRow] {
+  override def hasNext: Boolean = dataIterator.hasNext
+  override def next(): InternalRow = {
+val nextRow = dataIterator.next()
+if (predicate.eval(nextRow)) {
+  // child node emitted a row which is older than current watermark
+  // which is not allowed
+  throw QueryExecutionErrors.emittedRowsAreOlderThanWatermark(
+eventTimeWatermarkForEviction.get)
+}
+nextRow
+  }
+}
+  },
+  preservesPartitioning = true

Review Comment:
   As per my understanding, this preserves the partitioning from the child to 
upstream operators - and we want to keep that partitioning - because 
UpdateEventTimeColumnExec node does not modify the order of input rows. 
   
   However, I am confused by Javadoc of `mapPartitions` suggesting this 
parameter should only used for PairRDDs. @HeartSaVioR do you have more context 
here? 



##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql](
 )
   }
 
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state 
API v2.
+   * We allow the user to act on per-group set of input rows along with keyed 
state and the
+   * user can choose to output/return 0 or more rows.
+   * For a streaming dataframe, we will repeatedly invoke the interface 
methods for new rows
+   * in each trigger and the user's state/state variables will be stored 
persistently across
+   * invocations.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
+   * @param statefulProcessor Instance of statefulProcessor whose functions 
will
+   *  be invoked by the operator.
+   * @param timeMode  The time mode semantics of the stateful 
processor for timers and TTL.
+   * @param eventTimeColumnName eventTime column in the output dataset. Any 
operations after
+   *transformWithState will use the new 
eventTimeColumn. The user
+   *needs to ensure that the eventTime for emitted 
output adheres to
+   *the watermark boundary, otherwise streaming 
query will fail.
+   * @param outputModeThe output mode of the stateful processor.
+   *
+   * See [[Encoder]] for more details on what types are encodable to 

Re: [PR] [REFERENCE][DO-NOT-MERGE] Implementation of python streaminng data source [spark]

2024-04-23 Thread via GitHub


chaoqin-li1123 closed pull request #45065: [REFERENCE][DO-NOT-MERGE] 
Implementation of python streaminng data source
URL: https://github.com/apache/spark/pull/45065


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1577007339


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql](
 )
   }
 
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state 
API v2.
+   * We allow the user to act on per-group set of input rows along with keyed 
state and the
+   * user can choose to output/return 0 or more rows.
+   * For a streaming dataframe, we will repeatedly invoke the interface 
methods for new rows
+   * in each trigger and the user's state/state variables will be stored 
persistently across
+   * invocations.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
+   * @param statefulProcessor Instance of statefulProcessor whose functions 
will
+   *  be invoked by the operator.
+   * @param timeMode  The time mode semantics of the stateful 
processor for timers and TTL.
+   * @param eventTimeColumnName eventTime column in the output dataset. Any 
operations after
+   *transformWithState will use the new 
eventTimeColumn. The user
+   *needs to ensure that the eventTime for emitted 
output adheres to
+   *the watermark boundary, otherwise streaming 
query will fail.
+   * @param outputModeThe output mode of the stateful processor.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   */
+  private[sql] def transformWithState[U: Encoder](
+   statefulProcessor: StatefulProcessor[K, V, U],
+   timeMode: TimeMode,
+   eventTimeColumnName: String,
+   outputMode: OutputMode): Dataset[U] = {
+val existingWatermarkDelay = logicalPlan.flatMap {
+  case EventTimeWatermark(_, delay, _) => Seq(delay)
+  case _ => Seq()
+}
+
+if (existingWatermarkDelay.isEmpty) {
+  throw QueryCompilationErrors.cannotAssignEventTimeColumn()
+}
+
+val transformWithState = TransformWithState[K, V, U](
+  groupingAttributes,
+  dataAttributes,
+  statefulProcessor,
+  timeMode,
+  outputMode,
+  child = logicalPlan
+)
+
+val twsDS = Dataset[U](
+  sparkSession,
+  transformWithState
+)
+
+val delay = existingWatermarkDelay.head
+
+Dataset[U](sparkSession, EliminateEventTimeWatermark(
+  UpdateEventTimeWatermarkColumn(
+UnresolvedAttribute(eventTimeColumnName),
+delay,
+twsDS.logicalPlan)))
+  }
+
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state 
API v2.
+   * Functions as the function above, but with additional initial state.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
+   * @tparam S The type of initial state objects. Must be encodable to Spark 
SQL types.
+   * @param statefulProcessor   Instance of statefulProcessor whose functions 
will
+   *be invoked by the operator.
+   * @param timeModeThe time mode semantics of the stateful 
processor for
+   *timers and TTL.
+   * @param eventTimeColumnName eventTime column in the output dataset. Any 
operations after
+   *transformWithState will use the new 
eventTimeColumn. The user
+   *needs to ensure that the eventTime for emitted 
output adheres to
+   *the watermark boundary, otherwise streaming 
query will fail.
+   * @param outputMode  The output mode of the stateful processor.
+   * @param initialStateUser provided initial state that will be used 
to initiate state for
+   *the query in the first batch.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   */
+  private[sql] def transformWithState[U: Encoder, S: Encoder](
+  statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
+  timeMode: TimeMode,
+  eventTimeColumnName: String,
+  outputMode: OutputMode,
+  initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
+val existingWatermarkDelay = logicalPlan.collect {
+  case EventTimeWatermark(_, delay, _) => delay
+}
+
+if (existingWatermarkDelay.isEmpty) {

Review Comment:
   added a private method to consolidate. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, 

Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]

2024-04-23 Thread via GitHub


gengliangwang commented on code in PR #46140:
URL: https://github.com/apache/spark/pull/46140#discussion_r1577006944


##
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala:
##
@@ -37,7 +37,7 @@ class SparkConnectServerListenerSuite
 
   private var kvstore: ElementTrackingStore = _
 
-  private val jobTag = ExecuteJobTag("sessionId", "userId", "operationId")
+  private val jobTag = ExecuteJobTag("userId", "sessionId", "operationId")

Review Comment:
   From the code changes, there does seem to be a bug. All the callers are 
using the correct order. This PR seems just to unify the usages for such 
functions. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]

2024-04-23 Thread via GitHub


viirya commented on PR #19:
URL: https://github.com/apache/spark-connect-go/pull/19#issuecomment-2073619182

   Got it. I didn't know the actual meaning of `v34` so I thought that you had 
v1, v2, ...v34 before.
   
   I am also not sure why the version number is embedded into the Go module 
name. Is it a common practice seen on other Go modules too? So once this Go 
library has v2 release, you will go to increase `v1` to `v2` in the module name 
again?
   
   I am not familiar with how Go module naming policy. I try to compare this 
with JVM package name and release version. For example, we don't have 3.4 in 
Spark JVM package name actually.
   
   So I'm wondering if it makes sense to remove the version number `v34` from 
this Go module name? 樂 
   
   Of course it is commonly seen in other Go modules, then it is okay to rename 
it from `v34` to `v1`.
   
   It is also better to let other contributors of this Go library to take a 
look too.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]

2024-04-23 Thread via GitHub


ueshin commented on PR #46140:
URL: https://github.com/apache/spark/pull/46140#issuecomment-2073596694

   Also cc @gengliangwang @juliuszsompolski 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]

2024-04-23 Thread via GitHub


hiboyang commented on PR #19:
URL: https://github.com/apache/spark-connect-go/pull/19#issuecomment-2073588176

   > > Use v1 for this Spark Connect Go Client library, instead of previously 
using v34.
   > 
   > For Spart K8s operator, we begin with 0.1.0 as the first release version 
as discussion in mailing list.
   > 
   > For the v1 and v34 versions here, I'm not sure about if it is same case. 
Do you already have v34 release?
   
   Hmm... feel there is some confusion with the meaning of `v1` and `v34` here. 
`v34` is part of Go module name like `github.com/apache/spark-connect-go/v34`. 
It is not a release. Due to the recent discussion, we want to decouple the 
versioning of this project from Spark versioning. Thus modify the module name 
from `github.com/apache/spark-connect-go/v34` to 
`github.com/apache/spark-connect-go/v1`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]

2024-04-23 Thread via GitHub


hiboyang commented on code in PR #19:
URL: https://github.com/apache/spark-connect-go/pull/19#discussion_r1576977935


##
go.mod:
##
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-module github.com/apache/spark-connect-go/v34
+module github.com/apache/spark-connect-go/v1

Review Comment:
   Previously we use `v34` to make it align with Spark 3.4 at that time. It is 
not accumulated version number. Things will still work after changing `v34` to 
`v1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]

2024-04-23 Thread via GitHub


hiboyang commented on PR #19:
URL: https://github.com/apache/spark-connect-go/pull/19#issuecomment-2073580127

   > Does this Spark connect go library have any previous release yet?
   
   No release previously. For GoLang repo, people can just import the repo 
directly. Thus such "releasing" is actually not required.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured logging framework [spark]

2024-04-23 Thread via GitHub


gengliangwang commented on code in PR #45969:
URL: https://github.com/apache/spark/pull/45969#discussion_r1576968211


##
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##
@@ -74,14 +76,18 @@ case class AdaptiveSparkPlanExec(
 
   @transient private val lock = new Object()
 
-  @transient private val logOnLevel: ( => String) => Unit = 
conf.adaptiveExecutionLogLevel match {
-case "TRACE" => logTrace(_)
-case "DEBUG" => logDebug(_)
-case "INFO" => logInfo(_)
-case "WARN" => logWarning(_)
-case "ERROR" => logError(_)
-case _ => logDebug(_)
-  }
+  @transient private val logOnLevel: ( => MessageWithContext) => Unit =
+conf.adaptiveExecutionLogLevel match {
+  case "TRACE" =>
+def fn(log: => LogEntry): Unit = logTrace(log.message)

Review Comment:
   There is a `logTrace` accepting MessageWithContext now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured logging framework [spark]

2024-04-23 Thread via GitHub


gengliangwang commented on code in PR #45969:
URL: https://github.com/apache/spark/pull/45969#discussion_r1576967703


##
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##
@@ -353,8 +359,10 @@ case class AdaptiveSparkPlanExec(
   val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
   if (newCost < origCost ||
 (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
-logOnLevel("Plan changed:\n" +
-  sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n"))
+val plans =
+  sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n")
+logOnLevel(log"Plan changed:\n" +
+  log"${MDC(QUERY_PLAN, plans)}")

Review Comment:
   ```suggestion
   lazy val plans =
 sideBySide(currentPhysicalPlan.treeString, 
newPhysicalPlan.treeString).mkString("\n")
   logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to structured logging framework [spark]

2024-04-23 Thread via GitHub


dtenedor commented on PR #46192:
URL: https://github.com/apache/spark/pull/46192#issuecomment-2073520844

   @gengliangwang FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to structured logging framework [spark]

2024-04-23 Thread via GitHub


dtenedor opened a new pull request, #46192:
URL: https://github.com/apache/spark/pull/46192

   ### What changes were proposed in this pull request?
   
   Migrate logInfo with variables of the streaming module to structured logging 
framework. This transforms the logInfo entries of the following API
   ```
   def logInfo(msg: => String): Unit
   ```
   to
   ```
   def logInfo(entry: LogEntry): Unit
   ```
   
   ### Why are the changes needed?
   
   To enhance Apache Spark's logging system by implementing structured logging.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, Spark core logs will contain additional MDC
   
   ### How was this patch tested?
   
   Compiler and scala style checks, as well as code review.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576880695


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, 
IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI._
+
+class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep with Logging {
+
+  private lazy val driverServiceName: String = kubernetesConf.driverServiceName
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+if (!kubernetesConf.get(UI_ENABLED) || 
!kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) {
+  logInfo(
+s"Skip building ingress for Spark UI, due to" +
+  s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.")
+  return Seq.empty
+}
+
+val appId = kubernetesConf.appId
+val uiPort = kubernetesConf.get(UI_PORT)
+val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) 
match {
+  case Some(ingressHostPattern) =>
+ingressHostPattern.replace("{{APP_ID}}", appId)
+  case None =>
+logWarning(s"Skip building ingress for Spark UI, due to " +
+  s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.")
+return Seq.empty
+}
+
+val customLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+  kubernetesConf.sparkConf,
+  KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX)
+val labels = customLabels ++ Map(SPARK_APP_ID_LABEL -> appId)
+
+val annotations = KubernetesUtils.parsePrefixedKeyValuePairs(
+  kubernetesConf.sparkConf,
+  KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX)
+
+val path = new HTTPIngressPathBuilder()
+  .withPath(kubernetesConf.get(KUBERNETES_INGRESS_PATH))
+  .withPathType(kubernetesConf.get(KUBERNETES_INGRESS_PATH_TYPE))
+  .withNewBackend()
+.withNewService()
+  .withName(driverServiceName)
+  .withPort(new ServiceBackendPortBuilder().withNumber(uiPort).build())
+.endService()
+  .endBackend()
+  .build()
+
+val uiRule = new IngressRuleBuilder()
+  .withHost(ingressHost)
+  .withNewHttp()
+.addToPaths(path)
+  .endHttp()
+  .build()
+
+val ingress = new IngressBuilder()

Review Comment:
   It seems that there is no owner setting here. Could you double-check if this 
increase the number of ingress monotonically? I'm wondering who and when this 
ingress is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576873744


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala:
##
@@ -749,13 +749,52 @@ private[spark] object Config extends Logging {
   .checkValue(value => value > 0, "Gracefully shutdown period must be a 
positive time value")
   .createWithDefaultString("20s")
 
+  val KUBERNETES_INGRESS_ENABLED =
+ConfigBuilder("spark.kubernetes.driver.ingress.enabled")
+  .doc("Whether to create ingress entry of the driver service for external 
UI access " +
+"in cluster mode. This only takes effect when `spark.ui.enabled` is 
true.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val KUBERNETES_INGRESS_HOST_PATTERN =
+ConfigBuilder("spark.kubernetes.driver.ingress.host")
+  .doc("Host for driver UI ingress, {{APP_ID}} will be replaced by 
application ID.")
+  .version("4.0.0")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_INGRESS_CLASS_NAME =
+ConfigBuilder("spark.kubernetes.driver.ingress.ingressClassName")
+  .doc("Reference the IngressClass that should be used to implement the 
ingress for spark UI.")
+  .version("4.0.0")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_INGRESS_PATH =
+ConfigBuilder("spark.kubernetes.driver.ingress.path")

Review Comment:
   May I ask when you want to override this? And, is it safe in terms of the 
security?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576870634


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala:
##
@@ -749,13 +749,52 @@ private[spark] object Config extends Logging {
   .checkValue(value => value > 0, "Gracefully shutdown period must be a 
positive time value")
   .createWithDefaultString("20s")
 
+  val KUBERNETES_INGRESS_ENABLED =
+ConfigBuilder("spark.kubernetes.driver.ingress.enabled")

Review Comment:
   I believe we don't need this because `KUBERNETES_INGRESS_HOST_PATTERN` is 
mandatory always. Shall we remove this redudant configuration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576868897


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala:
##
@@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
   new HadoopConfDriverFeatureStep(conf),
   new KerberosConfDriverFeatureStep(conf),
   new PodTemplateConfigMapStep(conf),
-  new LocalDirsFeatureStep(conf)) ++ userFeatures
+  new LocalDirsFeatureStep(conf),
+  new DriverIngressFeatureStep(conf)) ++ userFeatures

Review Comment:
   Can we provide a way for Spark Job runs successfully even 
`DriverIngressFeatureStep` and its K8s resources fails? IIUC, if something goes 
wrong, the Spark job is not started, isn't it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576866768


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, 
IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI._
+
+class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep with Logging {
+
+  private lazy val driverServiceName: String = kubernetesConf.driverServiceName
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+if (!kubernetesConf.get(UI_ENABLED) || 
!kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) {
+  logInfo(
+s"Skip building ingress for Spark UI, due to" +
+  s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.")
+  return Seq.empty
+}
+
+val appId = kubernetesConf.appId
+val uiPort = kubernetesConf.get(UI_PORT)
+val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) 
match {
+  case Some(ingressHostPattern) =>
+ingressHostPattern.replace("{{APP_ID}}", appId)
+  case None =>
+logWarning(s"Skip building ingress for Spark UI, due to " +
+  s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.")
+return Seq.empty

Review Comment:
   This `return` statement implies that we might have too complex code path 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576865495


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, 
IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI._
+
+class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep with Logging {
+
+  private lazy val driverServiceName: String = kubernetesConf.driverServiceName
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+if (!kubernetesConf.get(UI_ENABLED) || 
!kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) {
+  logInfo(
+s"Skip building ingress for Spark UI, due to" +
+  s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.")
+  return Seq.empty
+}
+
+val appId = kubernetesConf.appId
+val uiPort = kubernetesConf.get(UI_PORT)
+val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) 
match {

Review Comment:
   Instead of warning like this, please check at line 37 together and proceed 
with no-op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576864646


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, 
IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI._
+
+class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep with Logging {
+
+  private lazy val driverServiceName: String = kubernetesConf.driverServiceName
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+if (!kubernetesConf.get(UI_ENABLED) || 
!kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) {
+  logInfo(

Review Comment:
   Since this is not a bug, please move this `logInfo` to outside of this PR 
with a different message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576861859


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala:
##
@@ -94,6 +96,20 @@ private[spark] class KubernetesDriverConf(
 custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
   }
 
+  val driverServiceName: String = {
+val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
+if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = KubernetesUtils.uniqueID(clock)
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling 
back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+  }

Review Comment:
   Could you make a new PR for this moving `driverServiceName` from 
`DriverServiceFeatureStep` to `KubernetesDriverConf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576861006


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala:
##
@@ -94,6 +96,20 @@ private[spark] class KubernetesDriverConf(
 custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
   }
 
+  val driverServiceName: String = {

Review Comment:
   Shall we use `lazy val` since this is not used `KubernetesDriverConf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576856363


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala:
##
@@ -108,5 +101,5 @@ private[spark] object DriverServiceFeatureStep {
   val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key
   val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key
   val DRIVER_SVC_POSTFIX = "-driver-svc"
-  val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH
+  val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH - 
"-ingress".length

Review Comment:
   Please don't do that. This is a huge regression for the existing system by 
reducing the space of service name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576855288


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala:
##
@@ -87,16 +77,19 @@ private[spark] class DriverServiceFeatureStep(
   .withName(DRIVER_PORT_NAME)
   .withPort(driverPort)
   .withNewTargetPort(driverPort)
+  .withProtocol("TCP")

Review Comment:
   To @pan3793 , I'm not sure we need to have this code in `master`. I'd 
recommend to remove this here and not to make an independent PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] Ensure that ForeachBatch can use libraries imported externally [spark]

2024-04-23 Thread via GitHub


ericm-db opened a new pull request, #46191:
URL: https://github.com/apache/spark/pull/46191

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]

2024-04-23 Thread via GitHub


ueshin commented on PR #46140:
URL: https://github.com/apache/spark/pull/46140#issuecomment-2073342960

   We may want to rather fix the arguments of `apply` function to be consistent 
with the usage of them and with the other similar functions:
   
   
https://github.com/apache/spark/blob/2a64b706135a54e177ca2e5300ccec15c4dae8bb/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala#L296-L304
   
   
https://github.com/apache/spark/blob/2a64b706135a54e177ca2e5300ccec15c4dae8bb/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala#L312-L321


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47633][SQL][3.5] Include right-side plan output in `LateralJoin#allAttributes` for more consistent canonicalization [spark]

2024-04-23 Thread via GitHub


bersprockets opened a new pull request, #46190:
URL: https://github.com/apache/spark/pull/46190

   This is a backport of #45763 to branch-3.5.
   
   ### What changes were proposed in this pull request?
   
   Modify `LateralJoin` to include right-side plan output in `allAttributes`.
   
   ### Why are the changes needed?
   
   In the following example, the view v1 is cached, but a query of v1 does not 
use the cache:
   ```
   CREATE or REPLACE TEMP VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2);
   CREATE or REPLACE TEMP VIEW t2(c1, c2) AS VALUES (0, 1), (1, 2);
   
   create or replace temp view v1 as
   select *
   from t1
   join lateral (
 select c1 as a, c2 as b
 from t2)
   on c1 = a;
   
   cache table v1;
   
   explain select * from v1;
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- BroadcastHashJoin [c1#180], [a#173], Inner, BuildRight, false
  :- LocalTableScan [c1#180, c2#181]
  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
false] as bigint)),false), [plan_id=113]
 +- LocalTableScan [a#173, b#174]
   ```
   
   The canonicalized version of the `LateralJoin` node is not consistent when 
there is a join condition. For example, for the above query, the join condition 
is canonicalized as follows:
   ```
   Before canonicalization: Some((c1#174 = a#167))
   After canonicalization:  Some((none#0 = none#167))
   ```
   You can see that the `exprId` for the second operand of `EqualTo` is not 
normalized (it remains 167). That's because the attribute `a` from the 
right-side plan is not included `allAttributes`.
   
   This PR adds right-side attributes to `allAttributes` so that references to 
right-side attributes in the join condition are normalized during 
canonicalization.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New test.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576757237


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -78,15 +78,20 @@ case class TransformWithStateExec(
   override def shortName: String = "transformWithStateExec"
 
   override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
-timeMode match {
-  case ProcessingTime =>
-// TODO: check if we can return true only if actual timers are 
registered, or there is
-// expired state
-true
-  case EventTime =>
-eventTimeWatermarkForEviction.isDefined &&
-  newInputWatermark > eventTimeWatermarkForEviction.get
-  case _ => false
+if (outputMode == OutputMode.Append || outputMode == OutputMode.Update) {

Review Comment:
   This condition should always be true for timeMode set to ProcessingTime. I 
have refactored this condition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576756401


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala:
##
@@ -107,25 +109,67 @@ case class EventTimeWatermarkExec(
   }
 
   // Update the metadata on the eventTime column to include the desired delay.
-  override val output: Seq[Attribute] = child.output.map { a =>
-if (a semanticEquals eventTime) {
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.putLong(EventTimeWatermark.delayKey, delayMs)
-.build()
-  a.withMetadata(updatedMetadata)
-} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
-  // Remove existing watermark
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.remove(EventTimeWatermark.delayKey)
-.build()
-  a.withMetadata(updatedMetadata)
-} else {
-  a
-}
+  override val output: Seq[Attribute] = {
+val delayMs = EventTimeWatermark.getDelayMs(delay)
+updateEventTimeColumn(child.output, delayMs, eventTime)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
EventTimeWatermarkExec =
 copy(child = newChild)
 }
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ *
+ * This node also ensures that output emitted by the child node adheres
+ * to watermark. If the child node emits rows which are older than global
+ * watermark, the node will throw an query execution error and fail the user
+ * query.
+ */
+case class UpdateEventTimeColumnExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+eventTimeWatermarkForEviction: Option[Long],
+child: SparkPlan) extends UnaryExecNode with Logging {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576754116


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql](
 )
   }
 
+  /**
+   * (Scala-specific)

Review Comment:
   Default values for API params have an issue with Scala/Java compatibility. I 
guess thats why none of the other APIs have default params either. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576751501


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
   timeMode: TimeMode,
   outputMode: OutputMode,
   initialState: KeyValueGroupedDataset[K, S],
+  eventTimeColumnName: String,

Review Comment:
   Actually, we need multiple separate Java APIs (for combinations of 
eventTimeColumn and state). I have added them, thanks. 



##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
   timeMode: TimeMode,
   outputMode: OutputMode,
   initialState: KeyValueGroupedDataset[K, S],
+  eventTimeColumnName: String,
   outputEncoder: Encoder[U],
   initialStateEncoder: Encoder[S]): Dataset[U] = {
 transformWithState(statefulProcessor, timeMode,

Review Comment:
   Fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [DO-NOT-REVIEW] PySpark Dataframe doc test improvement [spark]

2024-04-23 Thread via GitHub


WweiL opened a new pull request, #46189:
URL: https://github.com/apache/spark/pull/46189

   
   
   ### What changes were proposed in this pull request?
   
   Improve the doc test, before it used a batch df and didn't really start the 
test.
   
   ### Why are the changes needed?
   
   Test coverage improvement
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Test only addition
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on PR #46184:
URL: https://github.com/apache/spark/pull/46184#issuecomment-2073145778

   cc @dongjoon-hyun @yaooqinn @LuciferYang @EnricoMi appreciate it if you 
could have a look on this idea


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47365][PYTHON] Add _toArrow() DataFrame method to PySpark [spark]

2024-04-23 Thread via GitHub


ianmcook commented on code in PR #45481:
URL: https://github.com/apache/spark/pull/45481#discussion_r1576718516


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -1775,6 +1775,12 @@ def _to_table(self) -> Tuple["pa.Table", 
Optional[StructType]]:
 assert table is not None
 return (table, schema)
 
+def _toArrow(self) -> "pa.Table":
+table = self._to_table()[0]
+return table
+
+_toArrow.__doc__ = PySparkDataFrame._toArrow.__doc__

Review Comment:
   @HyukjinKwon is this line no longer needed after 
https://github.com/apache/spark/pull/46129?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]

2024-04-23 Thread via GitHub


pan3793 commented on code in PR #46184:
URL: https://github.com/apache/spark/pull/46184#discussion_r1576715545


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala:
##
@@ -87,16 +77,19 @@ private[spark] class DriverServiceFeatureStep(
   .withName(DRIVER_PORT_NAME)
   .withPort(driverPort)
   .withNewTargetPort(driverPort)
+  .withProtocol("TCP")

Review Comment:
   Not related to the feature proposed by this PR, I just found our internal 
cluster(1.19, I know Spark already dropped official support for such an old 
version) complains `protocol` is omitted and failed to create resources without 
this change. If this change is acceptable, I can submit it in an independent PR.
   
   ```
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: -HTTP START-
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: > PATCH 
https://.org:6443/api/v1/namespaces/spark/services/spark-8e2c0b8f0ba4d21f-driver-svc?fieldManager=fabric8=true
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: > Authorization: Bearer 
*
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: > User-Agent: 
fabric8-kubernetes-client/6.12.1
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: 
{"apiVersion":"v1","kind":"Service","metadata":{"labels":{"spark-app-selector":"spark-0d5c3db763904e7585d9801295ded9ed"},"name":"spark-8e2c0b8f0ba4d21f-driver-svc","ownerReferences":[{"apiVersion":"v1","kind":"Pod","controller":true,"name":"org-apache-spark-examples-driversubmissiontest-ee4d378f0ba4d21a-driver","uid":"e29f255a-dd1f-45c3-89c4-1c30e1b51d6e"}]},"spec":{"clusterIP":"None","ports":[{"name":"driver-rpc-port","port":7078,"targetPort":7078},{"name":"blockmanager","port":7079,"targetPort":7079},{"name":"spark-ui","port":4040,"targetPort":4040}],"selector":{"spark-version":"4.0.0-SNAPSHOT","spark-app-selector":"spark-0d5c3db763904e7585d9801295ded9ed","spark-app-name":"org-apache-spark-examples-driversubmissiontest","spark-role":"driver"}}}
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < 500 Internal Server Error
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < cache-control: no-cache, 
private
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < content-length: 229
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < content-type: 
application/json
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < date: Tue, 23 Apr 2024 
15:48:25 GMT
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"failed
 to create typed patch object: .spec.ports: element 0: associative list with 
keys has an element that omits key field \"protocol\"","code":500}
   
   24/04/23 23:48:25 TRACE HttpLoggingInterceptor: -HTTP END-
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576708999


##
common/utils/src/main/resources/error/error-conditions.json:
##
@@ -1057,6 +1063,14 @@
 },
 "sqlState" : "4274K"
   },
+  "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
+"message" : [
+  "Previous node emitted rows which had eventTime older than current 
watermark value ",

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576703836


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -78,15 +78,20 @@ case class TransformWithStateExec(
   override def shortName: String = "transformWithStateExec"
 
   override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
-timeMode match {
-  case ProcessingTime =>
-// TODO: check if we can return true only if actual timers are 
registered, or there is
-// expired state
-true
-  case EventTime =>
-eventTimeWatermarkForEviction.isDefined &&
-  newInputWatermark > eventTimeWatermarkForEviction.get
-  case _ => false
+if (outputMode == OutputMode.Append || outputMode == OutputMode.Update) {

Review Comment:
   Hmm - won't we have an issue with `Append` mode using proc time based time 
mode ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576701553


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala:
##
@@ -107,25 +109,67 @@ case class EventTimeWatermarkExec(
   }
 
   // Update the metadata on the eventTime column to include the desired delay.
-  override val output: Seq[Attribute] = child.output.map { a =>
-if (a semanticEquals eventTime) {
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.putLong(EventTimeWatermark.delayKey, delayMs)
-.build()
-  a.withMetadata(updatedMetadata)
-} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
-  // Remove existing watermark
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.remove(EventTimeWatermark.delayKey)
-.build()
-  a.withMetadata(updatedMetadata)
-} else {
-  a
-}
+  override val output: Seq[Attribute] = {
+val delayMs = EventTimeWatermark.getDelayMs(delay)
+updateEventTimeColumn(child.output, delayMs, eventTime)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
EventTimeWatermarkExec =
 copy(child = newChild)
 }
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ *
+ * This node also ensures that output emitted by the child node adheres
+ * to watermark. If the child node emits rows which are older than global
+ * watermark, the node will throw an query execution error and fail the user
+ * query.
+ */
+case class UpdateEventTimeColumnExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+eventTimeWatermarkForEviction: Option[Long],
+child: SparkPlan) extends UnaryExecNode with Logging {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+child.execute().mapPartitions[InternalRow](
+  dataIterator => {
+val watermarkExpression = WatermarkSupport.watermarkExpression(
+  Some(eventTime), eventTimeWatermarkForEviction)
+
+if (watermarkExpression.isEmpty) {
+  // watermark should always be defined in this node.
+  throw QueryExecutionErrors.cannotGetEventTimeWatermarkError()
+}
+
+val predicate = Predicate.create(watermarkExpression.get, child.output)
+new Iterator[InternalRow] {
+  override def hasNext: Boolean = dataIterator.hasNext
+  override def next(): InternalRow = {
+val nextRow = dataIterator.next()
+if (predicate.eval(nextRow)) {
+  // child node emitted a row which is older than current watermark
+  // which is not allowed
+  throw QueryExecutionErrors.emittedRowsAreOlderThanWatermark(
+eventTimeWatermarkForEviction.get)
+}
+nextRow
+  }
+}
+  },
+  preservesPartitioning = true

Review Comment:
   What does this do exactly ? Could we add a comment just to clarify for the 
reader ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576698993


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala:
##
@@ -107,25 +109,67 @@ case class EventTimeWatermarkExec(
   }
 
   // Update the metadata on the eventTime column to include the desired delay.
-  override val output: Seq[Attribute] = child.output.map { a =>
-if (a semanticEquals eventTime) {
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.putLong(EventTimeWatermark.delayKey, delayMs)
-.build()
-  a.withMetadata(updatedMetadata)
-} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
-  // Remove existing watermark
-  val updatedMetadata = new MetadataBuilder()
-.withMetadata(a.metadata)
-.remove(EventTimeWatermark.delayKey)
-.build()
-  a.withMetadata(updatedMetadata)
-} else {
-  a
-}
+  override val output: Seq[Attribute] = {
+val delayMs = EventTimeWatermark.getDelayMs(delay)
+updateEventTimeColumn(child.output, delayMs, eventTime)
   }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
EventTimeWatermarkExec =
 copy(child = newChild)
 }
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ *
+ * This node also ensures that output emitted by the child node adheres
+ * to watermark. If the child node emits rows which are older than global
+ * watermark, the node will throw an query execution error and fail the user
+ * query.
+ */
+case class UpdateEventTimeColumnExec(
+eventTime: Attribute,
+delay: CalendarInterval,
+eventTimeWatermarkForEviction: Option[Long],
+child: SparkPlan) extends UnaryExecNode with Logging {

Review Comment:
   Are we using `Logging` here ? should we remove this and the import otherwise 
?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576698149


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql](
 )
   }
 
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state 
API v2.
+   * We allow the user to act on per-group set of input rows along with keyed 
state and the
+   * user can choose to output/return 0 or more rows.
+   * For a streaming dataframe, we will repeatedly invoke the interface 
methods for new rows
+   * in each trigger and the user's state/state variables will be stored 
persistently across
+   * invocations.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
+   * @param statefulProcessor Instance of statefulProcessor whose functions 
will
+   *  be invoked by the operator.
+   * @param timeMode  The time mode semantics of the stateful 
processor for timers and TTL.
+   * @param eventTimeColumnName eventTime column in the output dataset. Any 
operations after
+   *transformWithState will use the new 
eventTimeColumn. The user
+   *needs to ensure that the eventTime for emitted 
output adheres to
+   *the watermark boundary, otherwise streaming 
query will fail.
+   * @param outputModeThe output mode of the stateful processor.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   */
+  private[sql] def transformWithState[U: Encoder](
+   statefulProcessor: StatefulProcessor[K, V, U],
+   timeMode: TimeMode,
+   eventTimeColumnName: String,
+   outputMode: OutputMode): Dataset[U] = {
+val existingWatermarkDelay = logicalPlan.flatMap {
+  case EventTimeWatermark(_, delay, _) => Seq(delay)
+  case _ => Seq()
+}
+
+if (existingWatermarkDelay.isEmpty) {
+  throw QueryCompilationErrors.cannotAssignEventTimeColumn()
+}
+
+val transformWithState = TransformWithState[K, V, U](
+  groupingAttributes,
+  dataAttributes,
+  statefulProcessor,
+  timeMode,
+  outputMode,
+  child = logicalPlan
+)
+
+val twsDS = Dataset[U](
+  sparkSession,
+  transformWithState
+)
+
+val delay = existingWatermarkDelay.head
+
+Dataset[U](sparkSession, EliminateEventTimeWatermark(
+  UpdateEventTimeWatermarkColumn(
+UnresolvedAttribute(eventTimeColumnName),
+delay,
+twsDS.logicalPlan)))
+  }
+
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state 
API v2.
+   * Functions as the function above, but with additional initial state.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL 
types.
+   * @tparam S The type of initial state objects. Must be encodable to Spark 
SQL types.
+   * @param statefulProcessor   Instance of statefulProcessor whose functions 
will
+   *be invoked by the operator.
+   * @param timeModeThe time mode semantics of the stateful 
processor for
+   *timers and TTL.
+   * @param eventTimeColumnName eventTime column in the output dataset. Any 
operations after
+   *transformWithState will use the new 
eventTimeColumn. The user
+   *needs to ensure that the eventTime for emitted 
output adheres to
+   *the watermark boundary, otherwise streaming 
query will fail.
+   * @param outputMode  The output mode of the stateful processor.
+   * @param initialStateUser provided initial state that will be used 
to initiate state for
+   *the query in the first batch.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   */
+  private[sql] def transformWithState[U: Encoder, S: Encoder](
+  statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
+  timeMode: TimeMode,
+  eventTimeColumnName: String,
+  outputMode: OutputMode,
+  initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
+val existingWatermarkDelay = logicalPlan.collect {
+  case EventTimeWatermark(_, delay, _) => delay
+}
+
+if (existingWatermarkDelay.isEmpty) {

Review Comment:
   Could we consolidate this code with the one above using a private function ? 
i guess the only diff here is that we are passing the `initalState` dataset ?



##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql](
 )
   }
 
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary 

Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576691015


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
   timeMode: TimeMode,
   outputMode: OutputMode,
   initialState: KeyValueGroupedDataset[K, S],
+  eventTimeColumnName: String,

Review Comment:
   Don't we need a separate Java API for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576693458


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql](
 )
   }
 
+  /**
+   * (Scala-specific)

Review Comment:
   This will add 4 more interfaces - 2 on Scala and potentially 2 on Java. 
Could we consolidate the existing interfaces to optionally take a 
`eventTimeColumnName` instead ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576692106


##
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##
@@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
   timeMode: TimeMode,
   outputMode: OutputMode,
   initialState: KeyValueGroupedDataset[K, S],
+  eventTimeColumnName: String,
   outputEncoder: Encoder[U],
   initialStateEncoder: Encoder[S]): Dataset[U] = {
 transformWithState(statefulProcessor, timeMode,

Review Comment:
   We don't use `eventTimeColumnName` here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1576686293


##
common/utils/src/main/resources/error/error-conditions.json:
##
@@ -1057,6 +1063,14 @@
 },
 "sqlState" : "4274K"
   },
+  "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
+"message" : [
+  "Previous node emitted rows which had eventTime older than current 
watermark value ",

Review Comment:
   nit: `current_watermark_value=` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework [spark]

2024-04-23 Thread via GitHub


gengliangwang commented on PR #46130:
URL: https://github.com/apache/spark/pull/46130#issuecomment-2073052204

   Thanks, merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework [spark]

2024-04-23 Thread via GitHub


gengliangwang closed pull request #46130: [SPARK-47604][CORE] Resource 
managers: Migrate logInfo with variables to structured logging framework
URL: https://github.com/apache/spark/pull/46130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


ericm-db commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576656803


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage)
+}
 logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
 (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: 
String):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576629583


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage)
+}
 logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
 (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: 
String):

Review Comment:
   minor: rename to '...RunnerInitializationFailure' or '...FailedToInitialize' 
... 



##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage)
+}
 logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
 (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: 
String):
+StreamingPythonRunnerInitializationException = {
+new StreamingPythonRunnerInitializationException(resFromPython, errMessage)
+  }
+
+  class StreamingPythonRunnerInitializationException(resFromPython: Int, 
errMessage: String)
+extends SparkPythonException(
+  errorClass = "STREAMING_PYTHON_RUNNER_DID_NOT_INITIALIZE",

Review Comment:
   Similar rename here. This is what user will see. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]

2024-04-23 Thread via GitHub


anishshri-db commented on PR #45376:
URL: https://github.com/apache/spark/pull/45376#issuecomment-2072985699

   @sahnib - seems like there are still conflicts on the base branch ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" [spark]

2024-04-23 Thread via GitHub


xinrong-meng commented on PR #46174:
URL: https://github.com/apache/spark/pull/46174#issuecomment-2072958632

   Merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" [spark]

2024-04-23 Thread via GitHub


xinrong-meng closed pull request #46174: [SPARK-47864][FOLLOWUP][PYTHON][DOCS] 
Fix minor typo: "MLLib" -> "MLlib"
URL: https://github.com/apache/spark/pull/46174


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" [spark]

2024-04-23 Thread via GitHub


xinrong-meng commented on PR #46174:
URL: https://github.com/apache/spark/pull/46174#issuecomment-2072955519

   LGTM, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47050][SQL] Collect and publish partition level metrics for V1 [spark]

2024-04-23 Thread via GitHub


snmvaughan commented on PR #46188:
URL: https://github.com/apache/spark/pull/46188#issuecomment-2072876115

cc @cloud-fan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47050][SQL] Collect and publish partition level metrics for V1 [spark]

2024-04-23 Thread via GitHub


snmvaughan opened a new pull request, #46188:
URL: https://github.com/apache/spark/pull/46188

   We currently capture metrics which include the number of files, bytes and 
rows for a task along with the updated partitions. 
This change captures metrics for each updated partition, reporting the 
partition sub-paths along with the number of files, bytes, and rows per 
partition for each task.
   
   ### What changes were proposed in this pull request?
   
   1. Update the `WriteTaskStatsTracker` implementation to associate a 
partition with the file during writing, and to track the number of rows written 
to each file.  The final stats now include a map of partitions and the 
associated partition stats
   2. Update the `WriteJobStatsTracker` implementation to capture the partition 
subpaths and to publish a new Event to the listener bus.  The processed stats 
aggregate the statistics for each partition
   
   ### Why are the changes needed?
   This increases our understanding of written data by tracking the impact for 
each task on our datasets
   
   ### Does this PR introduce _any_ user-facing change?
   This makes partition-level data accessible through a new event.
   
   ### How was this patch tested?
   In addition to the new unit tests, this was run in a Kubernetes environment 
writing tables with differing partitioning strategies and validating the 
reported stats.  Unit tests using both `InsertIntoHadoopFsRelationCommand` and 
`InsertIntoHiveTable` now verify partition stats when dynamic partitioning is 
enabled.  We also verified that the aggregated partition metrics matched the 
existing metrics for number of files, bytes, and rows.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47958][TESTS] Change LocalSchedulerBackend to notify scheduler of executor on start [spark]

2024-04-23 Thread via GitHub


davintjong-db opened a new pull request, #46187:
URL: https://github.com/apache/spark/pull/46187

   
   
   ### What changes were proposed in this pull request?
   
   Changing to call `reviveOffers` on start (after the local executor is set 
up) so that the task scheduler knows about it. This matches behavior in 
CoarseGrainedSchedulerBackend, which will call an equivalent method on executor 
registration.
   
   ### Why are the changes needed?
   
   When using LocalSchedulerBackend, the task scheduler will not know about the 
executor until a task is run, which can lead to unexpected behavior in tests.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Running existing tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


ericm-db commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576514370


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,6 +91,12 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw new PythonException(s"Streaming Runner initialization failed" +
+s" (returned $resFromPython). " +
+s"Error message: $errMessage", null)
+}

Review Comment:
   Created a new error class, PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47956][SQL] Sanity check for unresolved LCA reference [spark]

2024-04-23 Thread via GitHub


dongjoon-hyun commented on PR #46185:
URL: https://github.com/apache/spark/pull/46185#issuecomment-2072773098

   Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >