[PR] [MINOR][PYTHON][TESTS] Make `test_creation_index` deterministic [spark]
zhengruifeng opened a new pull request, #46200: URL: https://github.com/apache/spark/pull/46200 ### What changes were proposed in this pull request? Make `test_creation_index` deterministic ### Why are the changes needed? it may fail in some env ``` FAIL [16.261s]: test_creation_index (pyspark.pandas.tests.frame.test_constructor.FrameConstructorTests.test_creation_index) -- Traceback (most recent call last): File "/home/jenkins/python/pyspark/testing/pandasutils.py", line 91, in _assert_pandas_equal assert_frame_equal( File "/databricks/python3/lib/python3.11/site-packages/pandas/_testing/asserters.py", line 1257, in assert_frame_equal assert_index_equal( File "/databricks/python3/lib/python3.11/site-packages/pandas/_testing/asserters.py", line 407, in assert_index_equal raise_assert_detail(obj, msg, left, right) File "/databricks/python3/lib/python3.11/site-packages/pandas/_testing/asserters.py", line 665, in raise_assert_detail raise AssertionError(msg) AssertionError: DataFrame.index are different DataFrame.index values are different (40.0 %) [left]: Int64Index([2, 3, 4, 6, 5], dtype='int64') [right]: Int64Index([2, 3, 4, 5, 6], dtype='int64') ``` ### Does this PR introduce _any_ user-facing change? no. test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47948][PYTHON] Upgrade the minimum `Pandas` version to 2.0.0 [spark]
itholic commented on code in PR #46175: URL: https://github.com/apache/spark/pull/46175#discussion_r1577236035 ## dev/create-release/spark-rm/Dockerfile: ## @@ -37,7 +37,7 @@ ENV DEBCONF_NONINTERACTIVE_SEEN true # These arguments are just for reuse and not really meant to be customized. ARG APT_INSTALL="apt-get install --no-install-recommends -y" -ARG PIP_PKGS="sphinx==4.5.0 mkdocs==1.1.2 numpy==1.20.3 pydata_sphinx_theme==0.13.3 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==3.1.2 twine==3.4.1 sphinx-plotly-directive==0.1.3 sphinx-copybutton==0.5.2 pandas==1.5.3 pyarrow==10.0.1 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.62.0 protobuf==4.21.6 grpcio-status==1.62.0 googleapis-common-protos==1.56.4" +ARG PIP_PKGS="sphinx==4.5.0 mkdocs==1.1.2 numpy==1.20.3 pydata_sphinx_theme==0.13.3 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==3.1.2 twine==3.4.1 sphinx-plotly-directive==0.1.3 sphinx-copybutton==0.5.2 pandas==2.0.3 pyarrow==10.0.1 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.62.0 protobuf==4.21.6 grpcio-status==1.62.0 googleapis-common-protos==1.56.4" Review Comment: Yeah, at least from my local envs they all pass the test suites. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47914][SQL] Do not display the splits parameter in Rang [spark]
guixiaowen commented on PR #46136: URL: https://github.com/apache/spark/pull/46136#issuecomment-2073986240 > Mind checking the test failures? @HyukjinKwon I made the changes. Do you have any other questions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47764][CORE][SQL] Cleanup shuffle dependencies based on ShuffleCleanupMode [spark]
bozhang2820 commented on code in PR #45930: URL: https://github.com/apache/spark/pull/45930#discussion_r1577219567 ## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala: ## @@ -869,6 +874,8 @@ case class AdaptiveExecutionContext(session: SparkSession, qe: QueryExecution) { */ val stageCache: TrieMap[SparkPlan, ExchangeQueryStageExec] = new TrieMap[SparkPlan, ExchangeQueryStageExec]() + + val shuffleIds: TrieMap[Int, Boolean] = new TrieMap[Int, Boolean]() Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47953][DOCS] MsSQLServer: Document Mapping Spark SQL Data Types to Microsoft SQL Server [spark]
yaooqinn commented on PR #46177: URL: https://github.com/apache/spark/pull/46177#issuecomment-2073972978 Thank you @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][DOCS] Add `docs/_generated/` to .gitignore [spark]
yaooqinn commented on PR #46178: URL: https://github.com/apache/spark/pull/46178#issuecomment-2073972776 Thank you @dongjoon-hyun @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47949][SQL][DOCKER][TESTS] MsSQLServer: Bump up mssql docker image version to 2022-CU12-GDR1-ubuntu-22.04 [spark]
yaooqinn commented on PR #46176: URL: https://github.com/apache/spark/pull/46176#issuecomment-2073972487 Thank you @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
LuciferYang commented on code in PR #46193: URL: https://github.com/apache/spark/pull/46193#discussion_r1577206238 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -144,6 +146,22 @@ trait LoggingSuiteBase } } + // An enumeration value that is not defined in `LogKey`, you can define it anywhere + private val externalLogKey = LogKey.VALUE + private val externalLog = log"${MDC(externalLogKey, "External log message.")}" + test("Logging with external LogKey") { Review Comment: friendly ping @gengliangwang, what's your take on this? Is `StructuredLogging` a feature that users can utilize? If so, how can we make it easier for users to extend the definition of `LogKey`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic [spark]
zhengruifeng commented on PR #46196: URL: https://github.com/apache/spark/pull/46196#issuecomment-2073941487 thanks @LuciferYang and @HyukjinKwon merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic [spark]
zhengruifeng closed pull request #46196: [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic URL: https://github.com/apache/spark/pull/46196 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
panbingkun commented on PR #46193: URL: https://github.com/apache/spark/pull/46193#issuecomment-2073933759 Wait a moment, I'm checking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
LuciferYang commented on code in PR #46193: URL: https://github.com/apache/spark/pull/46193#discussion_r1577159936 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -144,6 +146,22 @@ trait LoggingSuiteBase } } + // An enumeration value that is not defined in `LogKey`, you can define it anywhere + private val externalLogKey = LogKey.VALUE + private val externalLog = log"${MDC(externalLogKey, "External log message.")}" + test("Logging with external LogKey") { Review Comment: For this case, the key for `externalLog.context` will be always ~an empty string~ "value". Is this expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
LuciferYang commented on code in PR #46193: URL: https://github.com/apache/spark/pull/46193#discussion_r1577167593 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -144,6 +146,22 @@ trait LoggingSuiteBase } } + // An enumeration value that is not defined in `LogKey`, you can define it anywhere + private val externalLogKey = LogKey.VALUE + private val externalLog = log"${MDC(externalLogKey, "External log message.")}" + test("Logging with external LogKey") { Review Comment: In https://github.com/apache/spark/pull/46186/files, I try to relax the type of `MDC.key` to `Enumeration#Value`, which allows us to extend the use of other enumeration types as `MDC.key` to enhance a bit of extensibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect [spark]
HyukjinKwon commented on code in PR #46194: URL: https://github.com/apache/spark/pull/46194#discussion_r1577167281 ## python/pyspark/__init__.py: ## @@ -125,8 +125,10 @@ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: # for backward compatibility references. sys.modules["pyspark.context"] = context -# for back compatibility -from pyspark.sql import SQLContext, HiveContext, Row # noqa: F401 +# for back compatibility +from pyspark.sql import SQLContext, HiveContext # noqa: F401 + +from pyspark.sql import Row Review Comment: ```suggestion from pyspark.sql import Row # noqa: F401 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect [spark]
HyukjinKwon commented on code in PR #46194: URL: https://github.com/apache/spark/pull/46194#discussion_r1577167281 ## python/pyspark/__init__.py: ## @@ -125,8 +125,10 @@ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: # for backward compatibility references. sys.modules["pyspark.context"] = context -# for back compatibility -from pyspark.sql import SQLContext, HiveContext, Row # noqa: F401 +# for back compatibility +from pyspark.sql import SQLContext, HiveContext # noqa: F401 + +from pyspark.sql import Row Review Comment: ```suggestion from pyspark.sql import Row # noqa: F401 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` [spark]
HyukjinKwon commented on PR #46198: URL: https://github.com/apache/spark/pull/46198#issuecomment-2073905817 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` [spark]
HyukjinKwon closed pull request #46198: [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` URL: https://github.com/apache/spark/pull/46198 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` [spark]
HyukjinKwon opened a new pull request, #46198: URL: https://github.com/apache/spark/pull/46198 ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/46155 that adds check of `__name__` at `_with_origin`. ### Why are the changes needed? It is possible for a callable instance without __name__ attribute or/and __module__ attribute to be wrapped. For example, functools.partial. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `./bin/pyspark` ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
LuciferYang commented on code in PR #46193: URL: https://github.com/apache/spark/pull/46193#discussion_r1577159936 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -144,6 +146,22 @@ trait LoggingSuiteBase } } + // An enumeration value that is not defined in `LogKey`, you can define it anywhere + private val externalLogKey = LogKey.VALUE + private val externalLog = log"${MDC(externalLogKey, "External log message.")}" + test("Logging with external LogKey") { Review Comment: For this case, the key for `externalLog.context` will be an empty string. Is this expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: that's a good point. yes, any failure during resource creation will fail the job submission at the current stage, and I did see the ingress creation failure cases internally. we mitigate it by providing a switch to allow users to disable creating ingress for certain jobs by `spark.kubernetes.driver.ingress.enabled`. currently, we create resources in batches, https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181 if we want to make some resources optional, we need to add a new batch to handle them independently. meanwhile, a new method may need to add in `KubernetesFeatureConfigStep`, something like ``` trait KubernetesFeatureConfigStep { ... def getAdditionalOptionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } ``` @dongjoon-hyun WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47965][CORE] Avoid orNull in TypedConfigBuilder [spark]
HyukjinKwon opened a new pull request, #46197: URL: https://github.com/apache/spark/pull/46197 ### What changes were proposed in this pull request? This PR proposes to avoid orNull in `TypedConfigBuilder`. Keys and values cannot be set `null` anyway, see `RuntimeConfig` and `SparkConf`. ### Why are the changes needed? For code cleanup. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify them. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577138226 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: on the other hand, silently ignoring a resource that the user requested may confuse users. for example, if something goes wrong during the spark UI start, we fail the job immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577149932 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala: ## @@ -108,5 +101,5 @@ private[spark] object DriverServiceFeatureStep { val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key val DRIVER_SVC_POSTFIX = "-driver-svc" - val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH + val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH - "-ingress".length Review Comment: make sense, we can generate the ingress name independently to avoid such regression -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: that's a good point. yes, any failure during resource creation will fail the job submission at the current stage, and I did see the ingress creation failure cases internally. we mitigate it by providing a switch to allow users to disable creating ingress for certain jobs by `spark.kubernetes.driver.ingress.enabled`. currently, we create resources in batches, https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181 if we want to make some resources optional, we need to add a new batch to handle them independently. meanwhile, a new method may need to add in `KubernetesFeatureConfigStep`, something like ``` trait KubernetesFeatureConfigStep { ... def getAdditionalOptionalKubernetesResources(): Seq[Option[HasMetadata]] = Seq.empty } ``` @dongjoon-hyun WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: that's a good point. yes, any failure during resource creation will fail the job submission at the current stage, and I did see the ingress creation failure cases internally. we mitigate it by providing a switch to allow users to disable creating ingress for certain jobs `spark.kubernetes.driver.ingress.enabled`. currently, we create resources in batches, https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181 if we want to make some resources optional, we need to add a new batch to handle them independently. meanwhile, a new method may need to add in `KubernetesFeatureConfigStep`, something like ``` trait KubernetesFeatureConfigStep { ... def getAdditionalOptionalKubernetesResources(): Seq[Option[HasMetadata]] = Seq.empty } ``` @dongjoon-hyun WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577138226 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: on the other hand, silent ignoring a resource that the user requested may confuse users. for example, if something goes wrong during the spark UI start, we fail the job immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577134736 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala: ## @@ -87,16 +77,19 @@ private[spark] class DriverServiceFeatureStep( .withName(DRIVER_PORT_NAME) .withPort(driverPort) .withNewTargetPort(driverPort) + .withProtocol("TCP") Review Comment: OK, thanks for advising, will remove it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577131302 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: that's a good point. yes, any failure during resource creation will cause the job submission failure at the current stage, and I did see the ingress creation failure cases internally. we mitigate it by providing a switch to allow users to disable creating ingress for certain jobs `spark.kubernetes.driver.ingress.enabled`. currently, we create resources in batches, https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L161-L181 if we want to make some resources optional, we need to add a new batch to handle them independently. meanwhile, a new method may need to add in `KubernetesFeatureConfigStep`, something like ``` trait KubernetesFeatureConfigStep { ... def getAdditionalOptionalKubernetesResources(): Seq[Option[HasMetadata]] = Seq.empty } ``` @dongjoon-hyun WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1577106837 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI._ + +class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep with Logging { + + private lazy val driverServiceName: String = kubernetesConf.driverServiceName + + override def configurePod(pod: SparkPod): SparkPod = pod + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { +if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) { + logInfo( +s"Skip building ingress for Spark UI, due to" + + s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.") + return Seq.empty +} + +val appId = kubernetesConf.appId +val uiPort = kubernetesConf.get(UI_PORT) +val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) match { + case Some(ingressHostPattern) => +ingressHostPattern.replace("{{APP_ID}}", appId) + case None => +logWarning(s"Skip building ingress for Spark UI, due to " + + s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.") +return Seq.empty +} + +val customLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + kubernetesConf.sparkConf, + KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX) +val labels = customLabels ++ Map(SPARK_APP_ID_LABEL -> appId) + +val annotations = KubernetesUtils.parsePrefixedKeyValuePairs( + kubernetesConf.sparkConf, + KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX) + +val path = new HTTPIngressPathBuilder() + .withPath(kubernetesConf.get(KUBERNETES_INGRESS_PATH)) + .withPathType(kubernetesConf.get(KUBERNETES_INGRESS_PATH_TYPE)) + .withNewBackend() +.withNewService() + .withName(driverServiceName) + .withPort(new ServiceBackendPortBuilder().withNumber(uiPort).build()) +.endService() + .endBackend() + .build() + +val uiRule = new IngressRuleBuilder() + .withHost(ingressHost) + .withNewHttp() +.addToPaths(path) + .endHttp() + .build() + +val ingress = new IngressBuilder() Review Comment: yes, `OwnerReference` is handled at https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L175 I checked that deleting the driver pod removes the ingress too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47771][PYTHON][DOCS][TESTS][FOLLOWUP] Make `max_by, min_by` doctests deterministic [spark]
zhengruifeng opened a new pull request, #46196: URL: https://github.com/apache/spark/pull/46196 ### What changes were proposed in this pull request? Make `max_by, min_by` doctests deterministic ### Why are the changes needed? https://github.com/apache/spark/pull/45939 fixed this issue by sorting the rows, unfortunately, it is not enough: in group `department=Finance`, two rows `("Finance", "Frank", 5)` and `("Finance", "George", 5)` have the same value `years_in_dept=5`, so `min_by("name", "years_in_dept")` and `max_by("name", "years_in_dept")` is still non-deterministic. This test failed in some env: ``` ** File "/home/jenkins/python/pyspark/sql/connect/functions/builtin.py", line 1177, in pyspark.sql.connect.functions.builtin.max_by Failed example: df.groupby("department").agg( sf.max_by("name", "years_in_dept") ).sort("department").show() Expected: +--+---+ |department|max_by(name, years_in_dept)| +--+---+ | Consult| Henry| | Finance| George| +--+---+ Got: +--+---+ |department|max_by(name, years_in_dept)| +--+---+ | Consult| Henry| | Finance| Frank| +--+---+ ** File "/home/jenkins/python/pyspark/sql/connect/functions/builtin.py", line 1205, in pyspark.sql.connect.functions.builtin.min_by Failed example: df.groupby("department").agg( sf.min_by("name", "years_in_dept") ).sort("department").show() Expected: +--+---+ |department|min_by(name, years_in_dept)| +--+---+ | Consult|Eva| | Finance| George| +--+---+ Got: +--+---+ |department|min_by(name, years_in_dept)| +--+---+ | Consult|Eva| | Finance| Frank| +--+---+ ** ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming [spark]
pkotikalapudi commented on code in PR #42352: URL: https://github.com/apache/spark/pull/42352#discussion_r1577078911 ## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ## @@ -340,6 +385,45 @@ private[spark] class ExecutorAllocationManager( } } + /** + * Maximum number of executors to be removed per dra evaluation. + * + * This function limits the number of idle executors to be removed if the + * `streamingDRAFeatureEnabled` flag is enabled, otherwise it returns all the idle executors. + * The number of executors to be removed is based on + * the de-allocation ratio (`executorDeallocationRatio`) and timeout (`removeTime`). + * It helps in removing only few executors per evaluation cycle and helps in gradual removal of + * executors across micro-batches. + */ + private def maxExecutorDeallocationsPerEvaluation( + timedOutExecs : Seq[(String, Int)] + ): Seq[(String, Int)] = { +if (!streamingDRAFeatureEnabled) { + timedOutExecs +} +else { + val currentTime = clock.nanoTime() + + if (removeTime == NOT_SET) { Review Comment: Good catch. I will update it to `Seq.empty[(String, Int)]`. Every evaluation happens at a fixed rate of few hundred milliseconds. So all those executors will be evaluated in the next round -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect [spark]
HyukjinKwon opened a new pull request, #46194: URL: https://github.com/apache/spark/pull/46194 ### What changes were proposed in this pull request? This PR hides SQLContext and HiveContext in `pyspark-connect`. They do not exist there. ### Why are the changes needed? To recover https://github.com/apache/spark/actions/runs/8805830494 ### Does this PR introduce _any_ user-facing change? No, the main change has not been released yet. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used [spark]
HyukjinKwon commented on PR #43087: URL: https://github.com/apache/spark/pull/43087#issuecomment-2073737492 I am going to revert this for now - I am cleaning up and refactoring workers. I will bring this back in again later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming [spark]
pkotikalapudi commented on code in PR #42352: URL: https://github.com/apache/spark/pull/42352#discussion_r1577058026 ## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ## @@ -340,6 +385,45 @@ private[spark] class ExecutorAllocationManager( } } + /** + * Maximum number of executors to be removed per dra evaluation. Review Comment: It can be useful for batch jobs as well. Like allocation ratio, de-allocation ratio are also relevant in scaling-out/back only few executors at a time. Especially in cases where Stages of a batch job has alternating high/low tasks. I restricted this behind `streamingDRAFeatureEnabled` initially for feature distinction. I can modify the code to have this as a config which can be used in traditional DRA as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][DOCS] Fix type hint of 3 functions [spark]
zhengruifeng commented on PR #46179: URL: https://github.com/apache/spark/pull/46179#issuecomment-2073713053 thank you @dongjoon-hyun for reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]
ueshin commented on code in PR #46140: URL: https://github.com/apache/spark/pull/46140#discussion_r1577043660 ## connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala: ## @@ -37,7 +37,7 @@ class SparkConnectServerListenerSuite private var kvstore: ElementTrackingStore = _ - private val jobTag = ExecuteJobTag("sessionId", "userId", "operationId") + private val jobTag = ExecuteJobTag("userId", "sessionId", "operationId") Review Comment: The original fix is https://github.com/apache/spark/commit/dd6636b4e076c1c7fd3569ad509d28524a98d156. I'm fine with it too, but I asked https://github.com/apache/spark/pull/46140#issuecomment-2073342960 because I worried the inconsistency with the similar API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47764][CORE][SQL] Cleanup shuffle dependencies based on ShuffleCleanupMode [spark]
bozhang2820 commented on code in PR #45930: URL: https://github.com/apache/spark/pull/45930#discussion_r1575960906 ## core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala: ## @@ -187,6 +187,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId) } } +shuffleBlockResolver.removeShuffleToSkip(shuffleId) Review Comment: Yeah this is a bit weird... Changed to use a Guava cache with a fixed maximum size (1000) instead, so that we do not need to do cleanups for shufflesToSkip. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
panbingkun commented on PR #46193: URL: https://github.com/apache/spark/pull/46193#issuecomment-2073684346 cc @gengliangwang @LuciferYang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47963][CORE][TESTS] Add an external LogKey usage case in UT [spark]
panbingkun opened a new pull request, #46193: URL: https://github.com/apache/spark/pull/46193 ### What changes were proposed in this pull request? The pr aims to add `an external LogKey` usage case in UT. ### Why are the changes needed? Give an example to developers who use the LogKey mechanism in external system. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
HyukjinKwon commented on PR #46125: URL: https://github.com/apache/spark/pull/46125#issuecomment-2073665401 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
HyukjinKwon closed pull request #46125: [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark URL: https://github.com/apache/spark/pull/46125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming [spark]
pkotikalapudi commented on code in PR #42352: URL: https://github.com/apache/spark/pull/42352#discussion_r1577025511 ## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ## @@ -96,6 +96,30 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * If an executor with caching data blocks has been idle for more than this duration, * the executor will be removed * + * Dynamic resource allocation is also extended to work for structured streaming use case. + * (micro-batch paradigm). + * For it to work we would still need the above configs + few additional configs. + * + * For executor allocation, In traditional DRA target number of executors are added based on the Review Comment: From my understanding in batch queries each stage can have varied resource requirements depending upon what it does. So DRA has `schedulerBacklogTimeout` to figure out when it should ask for more resources ([more on it](schedulerBacklogTimeout)). So the [pendingTasks](https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289) are determined by [the pending tasks of current stage](https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L883). I have [modified it](https://github.com/apache/spark/pull/42352/files#diff-fdddb0421641035be18233c212f0e3ccd2d6a49d345bd0cd4eac08fc4d911e21R1003) to consider the pending tasks of other stages as well because structured streaming deals with micro-batches and we want to scale out if the there are still other stages pending in the same micro-batch. for eg: with current DRA code, if config `spark.dynamicAllocation.schedulerBacklogTimeout` is set to 6 seconds and we use that for structured streaming job where a micro-batch consists of 4 stages which will run at max for 5 seconds each. Then it wouldn't scale out even if 20 seconds pass because it is just 5+5+5+5 = 30seconds. But the above mentioned changes I have done, while running the second stage on the 6th second it figures out that other stages in the micro-batch are pending so it scale-out appropriately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1577018044 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateWatermarkSuite.scala: ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.sql.Timestamp +import java.time.{Instant, LocalDateTime, ZoneId} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.ExtendedAnalysisException +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} +import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider +import org.apache.spark.sql.functions.window +import org.apache.spark.sql.internal.SQLConf + +class ColumnRenamedStatefulProcessor + extends StatefulProcessor[String, InputEventRow, RenamedInputEventRow] + with Logging { + + override def init(outputMode: OutputMode): Unit = { } Review Comment: I dont think we need specifically state variables, as we only deal with the event time of output generated. Let me know if you think otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1577017638 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala: ## @@ -107,25 +109,67 @@ case class EventTimeWatermarkExec( } // Update the metadata on the eventTime column to include the desired delay. - override val output: Seq[Attribute] = child.output.map { a => -if (a semanticEquals eventTime) { - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.putLong(EventTimeWatermark.delayKey, delayMs) -.build() - a.withMetadata(updatedMetadata) -} else if (a.metadata.contains(EventTimeWatermark.delayKey)) { - // Remove existing watermark - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.remove(EventTimeWatermark.delayKey) -.build() - a.withMetadata(updatedMetadata) -} else { - a -} + override val output: Seq[Attribute] = { +val delayMs = EventTimeWatermark.getDelayMs(delay) +updateEventTimeColumn(child.output, delayMs, eventTime) } override protected def withNewChildInternal(newChild: SparkPlan): EventTimeWatermarkExec = copy(child = newChild) } + +/** + * Updates the event time column to [[eventTime]] in the child output. + * Any watermark calculations performed after this node will use the + * updated eventTimeColumn. + * + * This node also ensures that output emitted by the child node adheres + * to watermark. If the child node emits rows which are older than global + * watermark, the node will throw an query execution error and fail the user + * query. + */ +case class UpdateEventTimeColumnExec( +eventTime: Attribute, +delay: CalendarInterval, +eventTimeWatermarkForEviction: Option[Long], +child: SparkPlan) extends UnaryExecNode with Logging { + + override protected def doExecute(): RDD[InternalRow] = { +child.execute().mapPartitions[InternalRow]( + dataIterator => { +val watermarkExpression = WatermarkSupport.watermarkExpression( + Some(eventTime), eventTimeWatermarkForEviction) + +if (watermarkExpression.isEmpty) { + // watermark should always be defined in this node. + throw QueryExecutionErrors.cannotGetEventTimeWatermarkError() +} + +val predicate = Predicate.create(watermarkExpression.get, child.output) +new Iterator[InternalRow] { + override def hasNext: Boolean = dataIterator.hasNext + override def next(): InternalRow = { +val nextRow = dataIterator.next() +if (predicate.eval(nextRow)) { + // child node emitted a row which is older than current watermark + // which is not allowed + throw QueryExecutionErrors.emittedRowsAreOlderThanWatermark( +eventTimeWatermarkForEviction.get) +} +nextRow + } +} + }, + preservesPartitioning = true Review Comment: As per my understanding, this preserves the partitioning from the child to upstream operators - and we want to keep that partitioning - because UpdateEventTimeColumnExec node does not modify the order of input rows. However, I am confused by Javadoc of `mapPartitions` suggesting this parameter should only used for PairRDDs. @HeartSaVioR do you have more context here? ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql]( ) } + /** + * (Scala-specific) + * Invokes methods defined in the stateful processor used in arbitrary state API v2. + * We allow the user to act on per-group set of input rows along with keyed state and the + * user can choose to output/return 0 or more rows. + * For a streaming dataframe, we will repeatedly invoke the interface methods for new rows + * in each trigger and the user's state/state variables will be stored persistently across + * invocations. + * + * @tparam U The type of the output objects. Must be encodable to Spark SQL types. + * @param statefulProcessor Instance of statefulProcessor whose functions will + * be invoked by the operator. + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. + * @param eventTimeColumnName eventTime column in the output dataset. Any operations after + *transformWithState will use the new eventTimeColumn. The user + *needs to ensure that the eventTime for emitted output adheres to + *the watermark boundary, otherwise streaming query will fail. + * @param outputModeThe output mode of the stateful processor. + * + * See [[Encoder]] for more details on what types are encodable to
Re: [PR] [REFERENCE][DO-NOT-MERGE] Implementation of python streaminng data source [spark]
chaoqin-li1123 closed pull request #45065: [REFERENCE][DO-NOT-MERGE] Implementation of python streaminng data source URL: https://github.com/apache/spark/pull/45065 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1577007339 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql]( ) } + /** + * (Scala-specific) + * Invokes methods defined in the stateful processor used in arbitrary state API v2. + * We allow the user to act on per-group set of input rows along with keyed state and the + * user can choose to output/return 0 or more rows. + * For a streaming dataframe, we will repeatedly invoke the interface methods for new rows + * in each trigger and the user's state/state variables will be stored persistently across + * invocations. + * + * @tparam U The type of the output objects. Must be encodable to Spark SQL types. + * @param statefulProcessor Instance of statefulProcessor whose functions will + * be invoked by the operator. + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. + * @param eventTimeColumnName eventTime column in the output dataset. Any operations after + *transformWithState will use the new eventTimeColumn. The user + *needs to ensure that the eventTime for emitted output adheres to + *the watermark boundary, otherwise streaming query will fail. + * @param outputModeThe output mode of the stateful processor. + * + * See [[Encoder]] for more details on what types are encodable to Spark SQL. + */ + private[sql] def transformWithState[U: Encoder]( + statefulProcessor: StatefulProcessor[K, V, U], + timeMode: TimeMode, + eventTimeColumnName: String, + outputMode: OutputMode): Dataset[U] = { +val existingWatermarkDelay = logicalPlan.flatMap { + case EventTimeWatermark(_, delay, _) => Seq(delay) + case _ => Seq() +} + +if (existingWatermarkDelay.isEmpty) { + throw QueryCompilationErrors.cannotAssignEventTimeColumn() +} + +val transformWithState = TransformWithState[K, V, U]( + groupingAttributes, + dataAttributes, + statefulProcessor, + timeMode, + outputMode, + child = logicalPlan +) + +val twsDS = Dataset[U]( + sparkSession, + transformWithState +) + +val delay = existingWatermarkDelay.head + +Dataset[U](sparkSession, EliminateEventTimeWatermark( + UpdateEventTimeWatermarkColumn( +UnresolvedAttribute(eventTimeColumnName), +delay, +twsDS.logicalPlan))) + } + + /** + * (Scala-specific) + * Invokes methods defined in the stateful processor used in arbitrary state API v2. + * Functions as the function above, but with additional initial state. + * + * @tparam U The type of the output objects. Must be encodable to Spark SQL types. + * @tparam S The type of initial state objects. Must be encodable to Spark SQL types. + * @param statefulProcessor Instance of statefulProcessor whose functions will + *be invoked by the operator. + * @param timeModeThe time mode semantics of the stateful processor for + *timers and TTL. + * @param eventTimeColumnName eventTime column in the output dataset. Any operations after + *transformWithState will use the new eventTimeColumn. The user + *needs to ensure that the eventTime for emitted output adheres to + *the watermark boundary, otherwise streaming query will fail. + * @param outputMode The output mode of the stateful processor. + * @param initialStateUser provided initial state that will be used to initiate state for + *the query in the first batch. + * + * See [[Encoder]] for more details on what types are encodable to Spark SQL. + */ + private[sql] def transformWithState[U: Encoder, S: Encoder]( + statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], + timeMode: TimeMode, + eventTimeColumnName: String, + outputMode: OutputMode, + initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = { +val existingWatermarkDelay = logicalPlan.collect { + case EventTimeWatermark(_, delay, _) => delay +} + +if (existingWatermarkDelay.isEmpty) { Review Comment: added a private method to consolidate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe,
Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]
gengliangwang commented on code in PR #46140: URL: https://github.com/apache/spark/pull/46140#discussion_r1577006944 ## connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala: ## @@ -37,7 +37,7 @@ class SparkConnectServerListenerSuite private var kvstore: ElementTrackingStore = _ - private val jobTag = ExecuteJobTag("sessionId", "userId", "operationId") + private val jobTag = ExecuteJobTag("userId", "sessionId", "operationId") Review Comment: From the code changes, there does seem to be a bug. All the callers are using the correct order. This PR seems just to unify the usages for such functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]
viirya commented on PR #19: URL: https://github.com/apache/spark-connect-go/pull/19#issuecomment-2073619182 Got it. I didn't know the actual meaning of `v34` so I thought that you had v1, v2, ...v34 before. I am also not sure why the version number is embedded into the Go module name. Is it a common practice seen on other Go modules too? So once this Go library has v2 release, you will go to increase `v1` to `v2` in the module name again? I am not familiar with how Go module naming policy. I try to compare this with JVM package name and release version. For example, we don't have 3.4 in Spark JVM package name actually. So I'm wondering if it makes sense to remove the version number `v34` from this Go module name? 樂 Of course it is commonly seen in other Go modules, then it is okay to rename it from `v34` to `v1`. It is also better to let other contributors of this Go library to take a look too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]
ueshin commented on PR #46140: URL: https://github.com/apache/spark/pull/46140#issuecomment-2073596694 Also cc @gengliangwang @juliuszsompolski -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]
hiboyang commented on PR #19: URL: https://github.com/apache/spark-connect-go/pull/19#issuecomment-2073588176 > > Use v1 for this Spark Connect Go Client library, instead of previously using v34. > > For Spart K8s operator, we begin with 0.1.0 as the first release version as discussion in mailing list. > > For the v1 and v34 versions here, I'm not sure about if it is same case. Do you already have v34 release? Hmm... feel there is some confusion with the meaning of `v1` and `v34` here. `v34` is part of Go module name like `github.com/apache/spark-connect-go/v34`. It is not a release. Due to the recent discussion, we want to decouple the versioning of this project from Spark versioning. Thus modify the module name from `github.com/apache/spark-connect-go/v34` to `github.com/apache/spark-connect-go/v1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]
hiboyang commented on code in PR #19: URL: https://github.com/apache/spark-connect-go/pull/19#discussion_r1576977935 ## go.mod: ## @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -module github.com/apache/spark-connect-go/v34 +module github.com/apache/spark-connect-go/v1 Review Comment: Previously we use `v34` to make it align with Spark 3.4 at that time. It is not accumulated version number. Things will still work after changing `v34` to `v1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [CONNECT] Use v1 as spark connect go library starting version [spark-connect-go]
hiboyang commented on PR #19: URL: https://github.com/apache/spark-connect-go/pull/19#issuecomment-2073580127 > Does this Spark connect go library have any previous release yet? No release previously. For GoLang repo, people can just import the repo directly. Thus such "releasing" is actually not required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured logging framework [spark]
gengliangwang commented on code in PR #45969: URL: https://github.com/apache/spark/pull/45969#discussion_r1576968211 ## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala: ## @@ -74,14 +76,18 @@ case class AdaptiveSparkPlanExec( @transient private val lock = new Object() - @transient private val logOnLevel: ( => String) => Unit = conf.adaptiveExecutionLogLevel match { -case "TRACE" => logTrace(_) -case "DEBUG" => logDebug(_) -case "INFO" => logInfo(_) -case "WARN" => logWarning(_) -case "ERROR" => logError(_) -case _ => logDebug(_) - } + @transient private val logOnLevel: ( => MessageWithContext) => Unit = +conf.adaptiveExecutionLogLevel match { + case "TRACE" => +def fn(log: => LogEntry): Unit = logTrace(log.message) Review Comment: There is a `logTrace` accepting MessageWithContext now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47583][CORE] SQL core: Migrate logError with variables to structured logging framework [spark]
gengliangwang commented on code in PR #45969: URL: https://github.com/apache/spark/pull/45969#discussion_r1576967703 ## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala: ## @@ -353,8 +359,10 @@ case class AdaptiveSparkPlanExec( val newCost = costEvaluator.evaluateCost(newPhysicalPlan) if (newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { -logOnLevel("Plan changed:\n" + - sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n")) +val plans = + sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n") +logOnLevel(log"Plan changed:\n" + + log"${MDC(QUERY_PLAN, plans)}") Review Comment: ```suggestion lazy val plans = sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n") logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to structured logging framework [spark]
dtenedor commented on PR #46192: URL: https://github.com/apache/spark/pull/46192#issuecomment-2073520844 @gengliangwang FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to structured logging framework [spark]
dtenedor opened a new pull request, #46192: URL: https://github.com/apache/spark/pull/46192 ### What changes were proposed in this pull request? Migrate logInfo with variables of the streaming module to structured logging framework. This transforms the logInfo entries of the following API ``` def logInfo(msg: => String): Unit ``` to ``` def logInfo(entry: LogEntry): Unit ``` ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? Yes, Spark core logs will contain additional MDC ### How was this patch tested? Compiler and scala style checks, as well as code review. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576880695 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI._ + +class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep with Logging { + + private lazy val driverServiceName: String = kubernetesConf.driverServiceName + + override def configurePod(pod: SparkPod): SparkPod = pod + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { +if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) { + logInfo( +s"Skip building ingress for Spark UI, due to" + + s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.") + return Seq.empty +} + +val appId = kubernetesConf.appId +val uiPort = kubernetesConf.get(UI_PORT) +val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) match { + case Some(ingressHostPattern) => +ingressHostPattern.replace("{{APP_ID}}", appId) + case None => +logWarning(s"Skip building ingress for Spark UI, due to " + + s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.") +return Seq.empty +} + +val customLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + kubernetesConf.sparkConf, + KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX) +val labels = customLabels ++ Map(SPARK_APP_ID_LABEL -> appId) + +val annotations = KubernetesUtils.parsePrefixedKeyValuePairs( + kubernetesConf.sparkConf, + KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX) + +val path = new HTTPIngressPathBuilder() + .withPath(kubernetesConf.get(KUBERNETES_INGRESS_PATH)) + .withPathType(kubernetesConf.get(KUBERNETES_INGRESS_PATH_TYPE)) + .withNewBackend() +.withNewService() + .withName(driverServiceName) + .withPort(new ServiceBackendPortBuilder().withNumber(uiPort).build()) +.endService() + .endBackend() + .build() + +val uiRule = new IngressRuleBuilder() + .withHost(ingressHost) + .withNewHttp() +.addToPaths(path) + .endHttp() + .build() + +val ingress = new IngressBuilder() Review Comment: It seems that there is no owner setting here. Could you double-check if this increase the number of ingress monotonically? I'm wondering who and when this ingress is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576873744 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -749,13 +749,52 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_INGRESS_ENABLED = +ConfigBuilder("spark.kubernetes.driver.ingress.enabled") + .doc("Whether to create ingress entry of the driver service for external UI access " + +"in cluster mode. This only takes effect when `spark.ui.enabled` is true.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + val KUBERNETES_INGRESS_HOST_PATTERN = +ConfigBuilder("spark.kubernetes.driver.ingress.host") + .doc("Host for driver UI ingress, {{APP_ID}} will be replaced by application ID.") + .version("4.0.0") + .stringConf + .createOptional + + val KUBERNETES_INGRESS_CLASS_NAME = +ConfigBuilder("spark.kubernetes.driver.ingress.ingressClassName") + .doc("Reference the IngressClass that should be used to implement the ingress for spark UI.") + .version("4.0.0") + .stringConf + .createOptional + + val KUBERNETES_INGRESS_PATH = +ConfigBuilder("spark.kubernetes.driver.ingress.path") Review Comment: May I ask when you want to override this? And, is it safe in terms of the security? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576870634 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -749,13 +749,52 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_INGRESS_ENABLED = +ConfigBuilder("spark.kubernetes.driver.ingress.enabled") Review Comment: I believe we don't need this because `KUBERNETES_INGRESS_HOST_PATTERN` is mandatory always. Shall we remove this redudant configuration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576868897 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: ## @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures Review Comment: Can we provide a way for Spark Job runs successfully even `DriverIngressFeatureStep` and its K8s resources fails? IIUC, if something goes wrong, the Spark job is not started, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576866768 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI._ + +class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep with Logging { + + private lazy val driverServiceName: String = kubernetesConf.driverServiceName + + override def configurePod(pod: SparkPod): SparkPod = pod + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { +if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) { + logInfo( +s"Skip building ingress for Spark UI, due to" + + s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.") + return Seq.empty +} + +val appId = kubernetesConf.appId +val uiPort = kubernetesConf.get(UI_PORT) +val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) match { + case Some(ingressHostPattern) => +ingressHostPattern.replace("{{APP_ID}}", appId) + case None => +logWarning(s"Skip building ingress for Spark UI, due to " + + s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.") +return Seq.empty Review Comment: This `return` statement implies that we might have too complex code path here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576865495 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI._ + +class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep with Logging { + + private lazy val driverServiceName: String = kubernetesConf.driverServiceName + + override def configurePod(pod: SparkPod): SparkPod = pod + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { +if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) { + logInfo( +s"Skip building ingress for Spark UI, due to" + + s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.") + return Seq.empty +} + +val appId = kubernetesConf.appId +val uiPort = kubernetesConf.get(UI_PORT) +val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) match { Review Comment: Instead of warning like this, please check at line 37 together and proceed with no-op. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576864646 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI._ + +class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep with Logging { + + private lazy val driverServiceName: String = kubernetesConf.driverServiceName + + override def configurePod(pod: SparkPod): SparkPod = pod + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { +if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) { + logInfo( Review Comment: Since this is not a bug, please move this `logInfo` to outside of this PR with a different message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576861859 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala: ## @@ -94,6 +96,20 @@ private[spark] class KubernetesDriverConf( custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName)) } + val driverServiceName: String = { +val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX" +if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = KubernetesUtils.uniqueID(clock) + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + } Review Comment: Could you make a new PR for this moving `driverServiceName` from `DriverServiceFeatureStep` to `KubernetesDriverConf`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576861006 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala: ## @@ -94,6 +96,20 @@ private[spark] class KubernetesDriverConf( custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName)) } + val driverServiceName: String = { Review Comment: Shall we use `lazy val` since this is not used `KubernetesDriverConf`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576856363 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala: ## @@ -108,5 +101,5 @@ private[spark] object DriverServiceFeatureStep { val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key val DRIVER_SVC_POSTFIX = "-driver-svc" - val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH + val MAX_SERVICE_NAME_LENGTH = KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH - "-ingress".length Review Comment: Please don't do that. This is a huge regression for the existing system by reducing the space of service name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
dongjoon-hyun commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576855288 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala: ## @@ -87,16 +77,19 @@ private[spark] class DriverServiceFeatureStep( .withName(DRIVER_PORT_NAME) .withPort(driverPort) .withNewTargetPort(driverPort) + .withProtocol("TCP") Review Comment: To @pan3793 , I'm not sure we need to have this code in `master`. I'd recommend to remove this here and not to make an independent PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] Ensure that ForeachBatch can use libraries imported externally [spark]
ericm-db opened a new pull request, #46191: URL: https://github.com/apache/spark/pull/46191 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47921][CONNECT] Fix ExecuteJobTag creation in ExecuteHolder [spark]
ueshin commented on PR #46140: URL: https://github.com/apache/spark/pull/46140#issuecomment-2073342960 We may want to rather fix the arguments of `apply` function to be consistent with the usage of them and with the other similar functions: https://github.com/apache/spark/blob/2a64b706135a54e177ca2e5300ccec15c4dae8bb/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala#L296-L304 https://github.com/apache/spark/blob/2a64b706135a54e177ca2e5300ccec15c4dae8bb/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala#L312-L321 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47633][SQL][3.5] Include right-side plan output in `LateralJoin#allAttributes` for more consistent canonicalization [spark]
bersprockets opened a new pull request, #46190: URL: https://github.com/apache/spark/pull/46190 This is a backport of #45763 to branch-3.5. ### What changes were proposed in this pull request? Modify `LateralJoin` to include right-side plan output in `allAttributes`. ### Why are the changes needed? In the following example, the view v1 is cached, but a query of v1 does not use the cache: ``` CREATE or REPLACE TEMP VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2); CREATE or REPLACE TEMP VIEW t2(c1, c2) AS VALUES (0, 1), (1, 2); create or replace temp view v1 as select * from t1 join lateral ( select c1 as a, c2 as b from t2) on c1 = a; cache table v1; explain select * from v1; == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [c1#180], [a#173], Inner, BuildRight, false :- LocalTableScan [c1#180, c2#181] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=113] +- LocalTableScan [a#173, b#174] ``` The canonicalized version of the `LateralJoin` node is not consistent when there is a join condition. For example, for the above query, the join condition is canonicalized as follows: ``` Before canonicalization: Some((c1#174 = a#167)) After canonicalization: Some((none#0 = none#167)) ``` You can see that the `exprId` for the second operand of `EqualTo` is not normalized (it remains 167). That's because the attribute `a` from the right-side plan is not included `allAttributes`. This PR adds right-side attributes to `allAttributes` so that references to right-side attributes in the join condition are normalized during canonicalization. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576757237 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -78,15 +78,20 @@ case class TransformWithStateExec( override def shortName: String = "transformWithStateExec" override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = { -timeMode match { - case ProcessingTime => -// TODO: check if we can return true only if actual timers are registered, or there is -// expired state -true - case EventTime => -eventTimeWatermarkForEviction.isDefined && - newInputWatermark > eventTimeWatermarkForEviction.get - case _ => false +if (outputMode == OutputMode.Append || outputMode == OutputMode.Update) { Review Comment: This condition should always be true for timeMode set to ProcessingTime. I have refactored this condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576756401 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala: ## @@ -107,25 +109,67 @@ case class EventTimeWatermarkExec( } // Update the metadata on the eventTime column to include the desired delay. - override val output: Seq[Attribute] = child.output.map { a => -if (a semanticEquals eventTime) { - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.putLong(EventTimeWatermark.delayKey, delayMs) -.build() - a.withMetadata(updatedMetadata) -} else if (a.metadata.contains(EventTimeWatermark.delayKey)) { - // Remove existing watermark - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.remove(EventTimeWatermark.delayKey) -.build() - a.withMetadata(updatedMetadata) -} else { - a -} + override val output: Seq[Attribute] = { +val delayMs = EventTimeWatermark.getDelayMs(delay) +updateEventTimeColumn(child.output, delayMs, eventTime) } override protected def withNewChildInternal(newChild: SparkPlan): EventTimeWatermarkExec = copy(child = newChild) } + +/** + * Updates the event time column to [[eventTime]] in the child output. + * Any watermark calculations performed after this node will use the + * updated eventTimeColumn. + * + * This node also ensures that output emitted by the child node adheres + * to watermark. If the child node emits rows which are older than global + * watermark, the node will throw an query execution error and fail the user + * query. + */ +case class UpdateEventTimeColumnExec( +eventTime: Attribute, +delay: CalendarInterval, +eventTimeWatermarkForEviction: Option[Long], +child: SparkPlan) extends UnaryExecNode with Logging { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576754116 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql]( ) } + /** + * (Scala-specific) Review Comment: Default values for API params have an issue with Scala/Java compatibility. I guess thats why none of the other APIs have default params either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576751501 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S], + eventTimeColumnName: String, Review Comment: Actually, we need multiple separate Java APIs (for combinations of eventTimeColumn and state). I have added them, thanks. ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S], + eventTimeColumnName: String, outputEncoder: Encoder[U], initialStateEncoder: Encoder[S]): Dataset[U] = { transformWithState(statefulProcessor, timeMode, Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [DO-NOT-REVIEW] PySpark Dataframe doc test improvement [spark]
WweiL opened a new pull request, #46189: URL: https://github.com/apache/spark/pull/46189 ### What changes were proposed in this pull request? Improve the doc test, before it used a batch df and didn't really start the test. ### Why are the changes needed? Test coverage improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only addition ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on PR #46184: URL: https://github.com/apache/spark/pull/46184#issuecomment-2073145778 cc @dongjoon-hyun @yaooqinn @LuciferYang @EnricoMi appreciate it if you could have a look on this idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47365][PYTHON] Add _toArrow() DataFrame method to PySpark [spark]
ianmcook commented on code in PR #45481: URL: https://github.com/apache/spark/pull/45481#discussion_r1576718516 ## python/pyspark/sql/connect/dataframe.py: ## @@ -1775,6 +1775,12 @@ def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: assert table is not None return (table, schema) +def _toArrow(self) -> "pa.Table": +table = self._to_table()[0] +return table + +_toArrow.__doc__ = PySparkDataFrame._toArrow.__doc__ Review Comment: @HyukjinKwon is this line no longer needed after https://github.com/apache/spark/pull/46129? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47954][K8S] Support creating ingress entry for external UI access [spark]
pan3793 commented on code in PR #46184: URL: https://github.com/apache/spark/pull/46184#discussion_r1576715545 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala: ## @@ -87,16 +77,19 @@ private[spark] class DriverServiceFeatureStep( .withName(DRIVER_PORT_NAME) .withPort(driverPort) .withNewTargetPort(driverPort) + .withProtocol("TCP") Review Comment: Not related to the feature proposed by this PR, I just found our internal cluster(1.19, I know Spark already dropped official support for such an old version) complains `protocol` is omitted and failed to create resources without this change. If this change is acceptable, I can submit it in an independent PR. ``` 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: -HTTP START- 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: > PATCH https://.org:6443/api/v1/namespaces/spark/services/spark-8e2c0b8f0ba4d21f-driver-svc?fieldManager=fabric8=true 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: > Authorization: Bearer * 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: > User-Agent: fabric8-kubernetes-client/6.12.1 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: {"apiVersion":"v1","kind":"Service","metadata":{"labels":{"spark-app-selector":"spark-0d5c3db763904e7585d9801295ded9ed"},"name":"spark-8e2c0b8f0ba4d21f-driver-svc","ownerReferences":[{"apiVersion":"v1","kind":"Pod","controller":true,"name":"org-apache-spark-examples-driversubmissiontest-ee4d378f0ba4d21a-driver","uid":"e29f255a-dd1f-45c3-89c4-1c30e1b51d6e"}]},"spec":{"clusterIP":"None","ports":[{"name":"driver-rpc-port","port":7078,"targetPort":7078},{"name":"blockmanager","port":7079,"targetPort":7079},{"name":"spark-ui","port":4040,"targetPort":4040}],"selector":{"spark-version":"4.0.0-SNAPSHOT","spark-app-selector":"spark-0d5c3db763904e7585d9801295ded9ed","spark-app-name":"org-apache-spark-examples-driversubmissiontest","spark-role":"driver"}}} 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < 500 Internal Server Error 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < cache-control: no-cache, private 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < content-length: 229 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < content-type: application/json 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: < date: Tue, 23 Apr 2024 15:48:25 GMT 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"failed to create typed patch object: .spec.ports: element 0: associative list with keys has an element that omits key field \"protocol\"","code":500} 24/04/23 23:48:25 TRACE HttpLoggingInterceptor: -HTTP END- ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
sahnib commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576708999 ## common/utils/src/main/resources/error/error-conditions.json: ## @@ -1057,6 +1063,14 @@ }, "sqlState" : "4274K" }, + "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : { +"message" : [ + "Previous node emitted rows which had eventTime older than current watermark value ", Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576703836 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -78,15 +78,20 @@ case class TransformWithStateExec( override def shortName: String = "transformWithStateExec" override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = { -timeMode match { - case ProcessingTime => -// TODO: check if we can return true only if actual timers are registered, or there is -// expired state -true - case EventTime => -eventTimeWatermarkForEviction.isDefined && - newInputWatermark > eventTimeWatermarkForEviction.get - case _ => false +if (outputMode == OutputMode.Append || outputMode == OutputMode.Update) { Review Comment: Hmm - won't we have an issue with `Append` mode using proc time based time mode ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576701553 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala: ## @@ -107,25 +109,67 @@ case class EventTimeWatermarkExec( } // Update the metadata on the eventTime column to include the desired delay. - override val output: Seq[Attribute] = child.output.map { a => -if (a semanticEquals eventTime) { - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.putLong(EventTimeWatermark.delayKey, delayMs) -.build() - a.withMetadata(updatedMetadata) -} else if (a.metadata.contains(EventTimeWatermark.delayKey)) { - // Remove existing watermark - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.remove(EventTimeWatermark.delayKey) -.build() - a.withMetadata(updatedMetadata) -} else { - a -} + override val output: Seq[Attribute] = { +val delayMs = EventTimeWatermark.getDelayMs(delay) +updateEventTimeColumn(child.output, delayMs, eventTime) } override protected def withNewChildInternal(newChild: SparkPlan): EventTimeWatermarkExec = copy(child = newChild) } + +/** + * Updates the event time column to [[eventTime]] in the child output. + * Any watermark calculations performed after this node will use the + * updated eventTimeColumn. + * + * This node also ensures that output emitted by the child node adheres + * to watermark. If the child node emits rows which are older than global + * watermark, the node will throw an query execution error and fail the user + * query. + */ +case class UpdateEventTimeColumnExec( +eventTime: Attribute, +delay: CalendarInterval, +eventTimeWatermarkForEviction: Option[Long], +child: SparkPlan) extends UnaryExecNode with Logging { + + override protected def doExecute(): RDD[InternalRow] = { +child.execute().mapPartitions[InternalRow]( + dataIterator => { +val watermarkExpression = WatermarkSupport.watermarkExpression( + Some(eventTime), eventTimeWatermarkForEviction) + +if (watermarkExpression.isEmpty) { + // watermark should always be defined in this node. + throw QueryExecutionErrors.cannotGetEventTimeWatermarkError() +} + +val predicate = Predicate.create(watermarkExpression.get, child.output) +new Iterator[InternalRow] { + override def hasNext: Boolean = dataIterator.hasNext + override def next(): InternalRow = { +val nextRow = dataIterator.next() +if (predicate.eval(nextRow)) { + // child node emitted a row which is older than current watermark + // which is not allowed + throw QueryExecutionErrors.emittedRowsAreOlderThanWatermark( +eventTimeWatermarkForEviction.get) +} +nextRow + } +} + }, + preservesPartitioning = true Review Comment: What does this do exactly ? Could we add a comment just to clarify for the reader ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576698993 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala: ## @@ -107,25 +109,67 @@ case class EventTimeWatermarkExec( } // Update the metadata on the eventTime column to include the desired delay. - override val output: Seq[Attribute] = child.output.map { a => -if (a semanticEquals eventTime) { - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.putLong(EventTimeWatermark.delayKey, delayMs) -.build() - a.withMetadata(updatedMetadata) -} else if (a.metadata.contains(EventTimeWatermark.delayKey)) { - // Remove existing watermark - val updatedMetadata = new MetadataBuilder() -.withMetadata(a.metadata) -.remove(EventTimeWatermark.delayKey) -.build() - a.withMetadata(updatedMetadata) -} else { - a -} + override val output: Seq[Attribute] = { +val delayMs = EventTimeWatermark.getDelayMs(delay) +updateEventTimeColumn(child.output, delayMs, eventTime) } override protected def withNewChildInternal(newChild: SparkPlan): EventTimeWatermarkExec = copy(child = newChild) } + +/** + * Updates the event time column to [[eventTime]] in the child output. + * Any watermark calculations performed after this node will use the + * updated eventTimeColumn. + * + * This node also ensures that output emitted by the child node adheres + * to watermark. If the child node emits rows which are older than global + * watermark, the node will throw an query execution error and fail the user + * query. + */ +case class UpdateEventTimeColumnExec( +eventTime: Attribute, +delay: CalendarInterval, +eventTimeWatermarkForEviction: Option[Long], +child: SparkPlan) extends UnaryExecNode with Logging { Review Comment: Are we using `Logging` here ? should we remove this and the import otherwise ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576698149 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql]( ) } + /** + * (Scala-specific) + * Invokes methods defined in the stateful processor used in arbitrary state API v2. + * We allow the user to act on per-group set of input rows along with keyed state and the + * user can choose to output/return 0 or more rows. + * For a streaming dataframe, we will repeatedly invoke the interface methods for new rows + * in each trigger and the user's state/state variables will be stored persistently across + * invocations. + * + * @tparam U The type of the output objects. Must be encodable to Spark SQL types. + * @param statefulProcessor Instance of statefulProcessor whose functions will + * be invoked by the operator. + * @param timeMode The time mode semantics of the stateful processor for timers and TTL. + * @param eventTimeColumnName eventTime column in the output dataset. Any operations after + *transformWithState will use the new eventTimeColumn. The user + *needs to ensure that the eventTime for emitted output adheres to + *the watermark boundary, otherwise streaming query will fail. + * @param outputModeThe output mode of the stateful processor. + * + * See [[Encoder]] for more details on what types are encodable to Spark SQL. + */ + private[sql] def transformWithState[U: Encoder]( + statefulProcessor: StatefulProcessor[K, V, U], + timeMode: TimeMode, + eventTimeColumnName: String, + outputMode: OutputMode): Dataset[U] = { +val existingWatermarkDelay = logicalPlan.flatMap { + case EventTimeWatermark(_, delay, _) => Seq(delay) + case _ => Seq() +} + +if (existingWatermarkDelay.isEmpty) { + throw QueryCompilationErrors.cannotAssignEventTimeColumn() +} + +val transformWithState = TransformWithState[K, V, U]( + groupingAttributes, + dataAttributes, + statefulProcessor, + timeMode, + outputMode, + child = logicalPlan +) + +val twsDS = Dataset[U]( + sparkSession, + transformWithState +) + +val delay = existingWatermarkDelay.head + +Dataset[U](sparkSession, EliminateEventTimeWatermark( + UpdateEventTimeWatermarkColumn( +UnresolvedAttribute(eventTimeColumnName), +delay, +twsDS.logicalPlan))) + } + + /** + * (Scala-specific) + * Invokes methods defined in the stateful processor used in arbitrary state API v2. + * Functions as the function above, but with additional initial state. + * + * @tparam U The type of the output objects. Must be encodable to Spark SQL types. + * @tparam S The type of initial state objects. Must be encodable to Spark SQL types. + * @param statefulProcessor Instance of statefulProcessor whose functions will + *be invoked by the operator. + * @param timeModeThe time mode semantics of the stateful processor for + *timers and TTL. + * @param eventTimeColumnName eventTime column in the output dataset. Any operations after + *transformWithState will use the new eventTimeColumn. The user + *needs to ensure that the eventTime for emitted output adheres to + *the watermark boundary, otherwise streaming query will fail. + * @param outputMode The output mode of the stateful processor. + * @param initialStateUser provided initial state that will be used to initiate state for + *the query in the first batch. + * + * See [[Encoder]] for more details on what types are encodable to Spark SQL. + */ + private[sql] def transformWithState[U: Encoder, S: Encoder]( + statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S], + timeMode: TimeMode, + eventTimeColumnName: String, + outputMode: OutputMode, + initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = { +val existingWatermarkDelay = logicalPlan.collect { + case EventTimeWatermark(_, delay, _) => delay +} + +if (existingWatermarkDelay.isEmpty) { Review Comment: Could we consolidate this code with the one above using a private function ? i guess the only diff here is that we are passing the `initalState` dataset ? ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql]( ) } + /** + * (Scala-specific) + * Invokes methods defined in the stateful processor used in arbitrary
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576691015 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S], + eventTimeColumnName: String, Review Comment: Don't we need a separate Java API for this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576693458 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -739,6 +741,128 @@ class KeyValueGroupedDataset[K, V] private[sql]( ) } + /** + * (Scala-specific) Review Comment: This will add 4 more interfaces - 2 on Scala and potentially 2 on Java. Could we consolidate the existing interfaces to optionally take a `eventTimeColumnName` instead ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576692106 ## sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala: ## @@ -762,6 +890,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( timeMode: TimeMode, outputMode: OutputMode, initialState: KeyValueGroupedDataset[K, S], + eventTimeColumnName: String, outputEncoder: Encoder[U], initialStateEncoder: Encoder[S]): Dataset[U] = { transformWithState(statefulProcessor, timeMode, Review Comment: We don't use `eventTimeColumnName` here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on code in PR #45376: URL: https://github.com/apache/spark/pull/45376#discussion_r1576686293 ## common/utils/src/main/resources/error/error-conditions.json: ## @@ -1057,6 +1063,14 @@ }, "sqlState" : "4274K" }, + "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : { +"message" : [ + "Previous node emitted rows which had eventTime older than current watermark value ", Review Comment: nit: `current_watermark_value=` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework [spark]
gengliangwang commented on PR #46130: URL: https://github.com/apache/spark/pull/46130#issuecomment-2073052204 Thanks, merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework [spark]
gengliangwang closed pull request #46130: [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework URL: https://github.com/apache/spark/pull/46130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
ericm-db commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576656803 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage) +} logInfo(s"Runner initialization succeeded (returned $resFromPython).") (dataOut, dataIn) } + def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: String): Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576629583 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage) +} logInfo(s"Runner initialization succeeded (returned $resFromPython).") (dataOut, dataIn) } + def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: String): Review Comment: minor: rename to '...RunnerInitializationFailure' or '...FailedToInitialize' ... ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage) +} logInfo(s"Runner initialization succeeded (returned $resFromPython).") (dataOut, dataIn) } + def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: String): +StreamingPythonRunnerInitializationException = { +new StreamingPythonRunnerInitializationException(resFromPython, errMessage) + } + + class StreamingPythonRunnerInitializationException(resFromPython: Int, errMessage: String) +extends SparkPythonException( + errorClass = "STREAMING_PYTHON_RUNNER_DID_NOT_INITIALIZE", Review Comment: Similar rename here. This is what user will see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47960][SS] Allow chaining other stateful operators after transformWIthState operator. [spark]
anishshri-db commented on PR #45376: URL: https://github.com/apache/spark/pull/45376#issuecomment-2072985699 @sahnib - seems like there are still conflicts on the base branch ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" [spark]
xinrong-meng commented on PR #46174: URL: https://github.com/apache/spark/pull/46174#issuecomment-2072958632 Merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" [spark]
xinrong-meng closed pull request #46174: [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" URL: https://github.com/apache/spark/pull/46174 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47864][FOLLOWUP][PYTHON][DOCS] Fix minor typo: "MLLib" -> "MLlib" [spark]
xinrong-meng commented on PR #46174: URL: https://github.com/apache/spark/pull/46174#issuecomment-2072955519 LGTM, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47050][SQL] Collect and publish partition level metrics for V1 [spark]
snmvaughan commented on PR #46188: URL: https://github.com/apache/spark/pull/46188#issuecomment-2072876115 cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47050][SQL] Collect and publish partition level metrics for V1 [spark]
snmvaughan opened a new pull request, #46188: URL: https://github.com/apache/spark/pull/46188 We currently capture metrics which include the number of files, bytes and rows for a task along with the updated partitions. This change captures metrics for each updated partition, reporting the partition sub-paths along with the number of files, bytes, and rows per partition for each task. ### What changes were proposed in this pull request? 1. Update the `WriteTaskStatsTracker` implementation to associate a partition with the file during writing, and to track the number of rows written to each file. The final stats now include a map of partitions and the associated partition stats 2. Update the `WriteJobStatsTracker` implementation to capture the partition subpaths and to publish a new Event to the listener bus. The processed stats aggregate the statistics for each partition ### Why are the changes needed? This increases our understanding of written data by tracking the impact for each task on our datasets ### Does this PR introduce _any_ user-facing change? This makes partition-level data accessible through a new event. ### How was this patch tested? In addition to the new unit tests, this was run in a Kubernetes environment writing tables with differing partitioning strategies and validating the reported stats. Unit tests using both `InsertIntoHadoopFsRelationCommand` and `InsertIntoHiveTable` now verify partition stats when dynamic partitioning is enabled. We also verified that the aggregated partition metrics matched the existing metrics for number of files, bytes, and rows. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47958][TESTS] Change LocalSchedulerBackend to notify scheduler of executor on start [spark]
davintjong-db opened a new pull request, #46187: URL: https://github.com/apache/spark/pull/46187 ### What changes were proposed in this pull request? Changing to call `reviveOffers` on start (after the local executor is set up) so that the task scheduler knows about it. This matches behavior in CoarseGrainedSchedulerBackend, which will call an equivalent method on executor registration. ### Why are the changes needed? When using LocalSchedulerBackend, the task scheduler will not know about the executor until a task is run, which can lead to unexpected behavior in tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Running existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
ericm-db commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576514370 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,6 +91,12 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw new PythonException(s"Streaming Runner initialization failed" + +s" (returned $resFromPython). " + +s"Error message: $errMessage", null) +} Review Comment: Created a new error class, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47956][SQL] Sanity check for unresolved LCA reference [spark]
dongjoon-hyun commented on PR #46185: URL: https://github.com/apache/spark/pull/46185#issuecomment-2072773098 Merged to master for Apache Spark 4.0.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org