[jira] [Commented] (SPARK-42349) Support pandas cogroup with multiple df
[ https://issues.apache.org/jira/browse/SPARK-42349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684337#comment-17684337 ] Santosh Pingale commented on SPARK-42349: - >From https://lists.apache.org/thread/jlpr3jm0slk33qhj0r9jvwmz8sq50vd8 RDD currently supports cogroup with upto 4 dataframes (ZippedPartitionsRDD4) where as cogroup with pandas can handle only 2 dataframes (with ZippedPartitionsRDD2). In our use case, we do not have much control over how many data frames we may need in the cogroup.applyInPandas function. To achieve this, we can: (a) Implement ZippedPartitionsRDD5, ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with respective iterators, serializers and so on. This ensures we keep type safety intact but a lot more boilerplate code has to be written to achieve this. (b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and then getItem in a nested fashion. Then convert data to pandas df in the python function. This looks like a good workaround but mistakes are very easy to happen. We also don't look at typesafety here from user's point of view. (c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type set to Seq[T] which allows for arbitrary number of children to be set. Here we have very little boilerplate but we sacrifice type safety. (d) ... some new suggestions... ? I have option (c) implementation ready. Right now there are two codepaths, one with previous implementation. One with new implementation. The previous implementation supported either 2 or 3 args for UDF. If 3 args were passed then it automatically assumed that first arg is key. This assumption limits the flexibility. So I propose that we remove previous signature in favour of new one and as it is marked experimental, we shouldn't have issues to make this change. In the new alternative, we explicitly pass additional arg in `applyInPandas` called `pass_key`. If it is set to true then we key pass as the first arg followed by dfs. > Support pandas cogroup with multiple df > --- > > Key: SPARK-42349 > URL: https://issues.apache.org/jira/browse/SPARK-42349 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.3.1 >Reporter: Santosh Pingale >Priority: Trivial > > Currently pyspark support `cogroup.applyInPandas` with only 2 dataframes. The > improvement request is to support multiple dataframes with variable arity. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42349) Support pandas cogroup with multiple df
Santosh Pingale created SPARK-42349: --- Summary: Support pandas cogroup with multiple df Key: SPARK-42349 URL: https://issues.apache.org/jira/browse/SPARK-42349 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 3.3.1 Reporter: Santosh Pingale Currently pyspark support `cogroup.applyInPandas` with only 2 dataframes. The improvement request is to support multiple dataframes with variable arity. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42170) Files added to the spark-submit command with master K8s and deploy mode cluster, end up in a non deterministic location inside the driver.
[ https://issues.apache.org/jira/browse/SPARK-42170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-42170: Description: Files added to the spark-submit command with master K8s and deploy mode cluster, end up in a non deterministic location inside the driver. eg: {{spark-submit --files myfile --master k8s.. --deploy-mode cluster` will upload the files to /tmp/spark-uuid/myfile}} The issue happens because [Utils.createTempDir()|https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L344] creates a directory with a uuid in the directory name. This issue does not affect the --archives option, because we `unarchive` the archives into the destination directory which is relative to the working dir. This bug affects file access pre & post app creation. For example if we distribute python dependencies with pex, we need to use --files to attach the pex file and change the spark.pyspark.python to point to this file. But the file location can not be determined before submitting the app. On the other hand, after the app is created, referencing the files without using `SparkFiles.get` also does not work was: Files added to the spark-submit command with master K8s and deploy mode cluster, end up in a non deterministic location inside the driver. eg: {{spark-submit --files myfile --master k8s.. --deploy-mode cluster` will upload the files to /tmp/spark-uuid/myfile}} The issue happens because [Utils.createTempDir()|https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L344] creates a directory with a uuid in the directory name. This issue does not affect the `--archives` option, because we `unarchive` the archives into the destination directory which is relative to the working dir. This bug affects file access pre & post app creation. For example if we distribute python dependencies with pex, we need to use `--files` to attach the pex file and change the spark.pyspark.python to point to this file. But the file location can not be determined before submitting the app. On the other hand, after the app is created, referencing the files without using `SparkFiles.get` also does not work > Files added to the spark-submit command with master K8s and deploy mode > cluster, end up in a non deterministic location inside the driver. > -- > > Key: SPARK-42170 > URL: https://issues.apache.org/jira/browse/SPARK-42170 > Project: Spark > Issue Type: Bug > Components: Kubernetes, Spark Submit >Affects Versions: 3.3.0, 3.2.2 >Reporter: Santosh Pingale >Priority: Major > > Files added to the spark-submit command with master K8s and deploy mode > cluster, end up in a non deterministic location inside the driver. > eg: > {{spark-submit --files myfile --master k8s.. --deploy-mode cluster` will > upload the files to /tmp/spark-uuid/myfile}} > The issue happens because > [Utils.createTempDir()|https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L344] > creates a directory with a uuid in the directory name. This issue does not > affect the --archives option, because we `unarchive` the archives into the > destination directory which is relative to the working dir. This bug affects > file access pre & post app creation. For example if we distribute python > dependencies with pex, we need to use --files to attach the pex file and > change the spark.pyspark.python to point to this file. But the file location > can not be determined before submitting the app. On the other hand, after the > app is created, referencing the files without using `SparkFiles.get` also > does not work -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42170) Files added to the spark-submit command with master K8s and deploy mode cluster, end up in a non deterministic location inside the driver.
Santosh Pingale created SPARK-42170: --- Summary: Files added to the spark-submit command with master K8s and deploy mode cluster, end up in a non deterministic location inside the driver. Key: SPARK-42170 URL: https://issues.apache.org/jira/browse/SPARK-42170 Project: Spark Issue Type: Bug Components: Kubernetes, Spark Submit Affects Versions: 3.2.2, 3.3.0 Reporter: Santosh Pingale Files added to the spark-submit command with master K8s and deploy mode cluster, end up in a non deterministic location inside the driver. eg: {{spark-submit --files myfile --master k8s.. --deploy-mode cluster` will upload the files to /tmp/spark-uuid/myfile}} The issue happens because [Utils.createTempDir()|https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L344] creates a directory with a uuid in the directory name. This issue does not affect the `--archives` option, because we `unarchive` the archives into the destination directory which is relative to the working dir. This bug affects file access pre & post app creation. For example if we distribute python dependencies with pex, we need to use `--files` to attach the pex file and change the spark.pyspark.python to point to this file. But the file location can not be determined before submitting the app. On the other hand, after the app is created, referencing the files without using `SparkFiles.get` also does not work -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-40442) Unstable Spark history server: DB is closed
[ https://issues.apache.org/jira/browse/SPARK-40442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17610427#comment-17610427 ] Santosh Pingale edited comment on SPARK-40442 at 9/28/22 8:30 AM: -- {code:java} HTTP ERROR 500 java.lang.IllegalStateException: DB is closed. URI:https://xxx/sparkhistory/history/application_1664214774022_4650/1/jobs STATUS:500 MESSAGE:java.lang.IllegalState Exception: DB is closed. SERVLET:org.apache.spark.ui.JettyUtils$$anon$1-cd3a472 CAUSED BY:java.lang.IllegalStateException: DB is closed.Caused by:java.lang.IllegalStateException: DB is closed. at org.apache.spark.util.kvstore.LevelDB.db(LevelDB.java:364) at org.apache.spark.util.kvstore.LevelDBIterator.(LevelDBIterator.java:51) at org.apache.spark.util.kvstore.LevelDB$1.iterator(LevelDB.java:253) at org.apache.spark.util.kvstore.KVStoreView.closeableIterator(KVStoreView.java:117) at org.apache.spark.status.AppStatusStore.$anonfun$applicationInfo$1(AppStatusStore.scala:44) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2741) at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:46) at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:276) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1631) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.spark.deploy.history.ApplicationCacheCheckFilter.doFilter(ApplicationCache.scala:405) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:400) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:645) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:392) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.lang.Thread.run(Thread.java:748) {code} The
[jira] [Commented] (SPARK-40442) Unstable Spark history server: DB is closed
[ https://issues.apache.org/jira/browse/SPARK-40442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17610427#comment-17610427 ] Santosh Pingale commented on SPARK-40442: - {code:java} 2022-09-28 10:20:57,000 WARN /history/application_1664214774022_4650/1/jobs/ java.lang.IllegalStateException: DB is closed. at org.apache.spark.util.kvstore.LevelDB.db(LevelDB.java:364) at org.apache.spark.util.kvstore.LevelDBIterator.(LevelDBIterator.java:51) at org.apache.spark.util.kvstore.LevelDB$1.iterator(LevelDB.java:253) at org.apache.spark.util.kvstore.KVStoreView.closeableIterator(KVStoreView.java:117) at org.apache.spark.status.AppStatusStore.$anonfun$applicationInfo$1(AppStatusStore.scala:44) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2741) at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:46) at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:276) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1631) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.spark.deploy.history.ApplicationCacheCheckFilter.doFilter(ApplicationCache.scala:405) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:400) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:645) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:392) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.lang.Thread.run(Thread.java:748) {code} The error seems to be present for some applications that just have finished. The UI also reports this error. Upon restart of the SHS however the error goes away. > Unstable Spark history server: DB is closed > --- > > Key: SPARK-40442 > URL: https://issues.apache.org/jira/browse/SPARK-40442 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 3.2.2 >Reporter: Santosh Pingale >Priority: Minor > > Since we upgraded our spark history server to 3.2.2, it has been unstable. We > get
[jira] [Updated] (SPARK-40442) Unstable Spark history server: DB is closed
[ https://issues.apache.org/jira/browse/SPARK-40442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40442: Summary: Unstable Spark history server: DB is closed (was: Unstable Spark history server) > Unstable Spark history server: DB is closed > --- > > Key: SPARK-40442 > URL: https://issues.apache.org/jira/browse/SPARK-40442 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 3.2.2 >Reporter: Santosh Pingale >Priority: Minor > > Since we upgraded our spark history server to 3.2.2, it has been unstable. We > get following log lines continuously. > {code:java} > 2022-09-15 08:54:57,000 WARN > /api/v1/applications/application_/1/executors > javax.servlet.ServletException: java.lang.IllegalStateException: DB is > closed. at > org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410) > at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) > at > org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) > at > org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1631) > at > org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) > at > org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) > at > org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) > at > org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) > at > org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) > at > org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) > at > org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) > at > org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) > at > org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) > at > org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) > at > org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) > at > org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) > at > org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) > at org.sparkproject.jetty.server.Server.handle(Server.java:516) at > org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:400) > at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:645) > at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:392) at > org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) > at > org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) > at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at > org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) > at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) > at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) > at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) > at > org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) > at > org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) > at > org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.IllegalStateException: DB is closed. at > org.apache.spark.util.kvstore.LevelDB.db(LevelDB.java:364) at > org.apache.spark.util.kvstore.LevelDBIterator.(LevelDBIterator.java:51) at > org.apache.spark.util.kvstore.LevelDB$1.iterator(LevelDB.java:253) at > scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:60) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) at > scala.collection.TraversableLike.map(TraversableLike.scala:286) at >
[jira] [Updated] (SPARK-40442) Unstable Spark history server
[ https://issues.apache.org/jira/browse/SPARK-40442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40442: Description: Since we upgraded our spark history server to 3.2.2, it has been unstable. We get following log lines continuously. {code:java} 2022-09-15 08:54:57,000 WARN /api/v1/applications/application_/1/executors javax.servlet.ServletException: java.lang.IllegalStateException: DB is closed. at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410) at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1631) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:400) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:645) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:392) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: DB is closed. at org.apache.spark.util.kvstore.LevelDB.db(LevelDB.java:364) at org.apache.spark.util.kvstore.LevelDBIterator.(LevelDBIterator.java:51) at org.apache.spark.util.kvstore.LevelDB$1.iterator(LevelDB.java:253) at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:60) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.status.AppStatusStore.executorList(AppStatusStore.scala:92) at org.apache.spark.deploy.history.HistoryAppStatusStore.executorList(HistoryAppStatusStore.scala:46) at org.apache.spark.status.api.v1.AbstractApplicationResource.$anonfun$executorList$1(OneApplicationResource.scala:53) at org.apache.spark.status.api.v1.BaseAppResource.$anonfun$withUI$1(ApiRootResource.scala:142) at org.apache.spark.deploy.history.ApplicationCache.withSparkUI(ApplicationCache.scala:121) at
[jira] [Created] (SPARK-40442) Unstable Spark history server
Santosh Pingale created SPARK-40442: --- Summary: Unstable Spark history server Key: SPARK-40442 URL: https://issues.apache.org/jira/browse/SPARK-40442 Project: Spark Issue Type: Bug Components: Web UI Affects Versions: 3.2.2 Reporter: Santosh Pingale Since we upgraded our spark history server to 3.2.2, it has been unstable. We get following log lines continuously. {code:java} 2022-09-15 08:54:57,000 WARN /api/v1/applications/application_/1/executors javax.servlet.ServletException: java.lang.IllegalStateException: DB is closed. at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410) at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1631) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:400) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:645) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:392) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: DB is closed. at org.apache.spark.util.kvstore.LevelDB.db(LevelDB.java:364) at org.apache.spark.util.kvstore.LevelDBIterator.(LevelDBIterator.java:51) at org.apache.spark.util.kvstore.LevelDB$1.iterator(LevelDB.java:253) at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:60) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.status.AppStatusStore.executorList(AppStatusStore.scala:92) at org.apache.spark.deploy.history.HistoryAppStatusStore.executorList(HistoryAppStatusStore.scala:46) at org.apache.spark.status.api.v1.AbstractApplicationResource.$anonfun$executorList$1(OneApplicationResource.scala:53) at
[jira] [Updated] (SPARK-40311) Introduce withColumnsRenamed
[ https://issues.apache.org/jira/browse/SPARK-40311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40311: Docs Text: Add withColumnsRenamed to scala and pyspark API > Introduce withColumnsRenamed > > > Key: SPARK-40311 > URL: https://issues.apache.org/jira/browse/SPARK-40311 > Project: Spark > Issue Type: Improvement > Components: PySpark, SparkR, SQL >Affects Versions: 3.0.3, 3.1.3, 3.3.0, 3.2.2 >Reporter: Santosh Pingale >Priority: Minor > > Add a scala, pyspark, R dataframe API that can rename multiple columns in a > single command. Issues are faced when users iteratively perform > `withColumnRenamed`. > * When it works, we see slower performace > * In some cases, StackOverflowError is raised due to logical plan being too > big > * In a few cases, driver died due to memory consumption > Some reproducible benchmarks: > {code:java} > import datetime > import numpy as np > import pandas as pd > num_rows = 2 > num_columns = 100 > data = np.zeros((num_rows, num_columns)) > columns = map(str, range(num_columns)) > raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) > a = datetime.datetime.now() > for col in raw.columns: > raw = raw.withColumnRenamed(col, f"prefix_{col}") > b = datetime.datetime.now() > for col in raw.columns: > raw = raw.withColumnRenamed(col, f"prefix_{col}") > c = datetime.datetime.now() > for col in raw.columns: > raw = raw.withColumnRenamed(col, f"prefix_{col}") > d = datetime.datetime.now() > for col in raw.columns: > raw = raw.withColumnRenamed(col, f"prefix_{col}") > e = datetime.datetime.now() > for col in raw.columns: > raw = raw.withColumnRenamed(col, f"prefix_{col}") > f = datetime.datetime.now() > for col in raw.columns: > raw = raw.withColumnRenamed(col, f"prefix_{col}") > g = datetime.datetime.now() > g-a > datetime.timedelta(seconds=12, microseconds=480021) {code} > {code:java} > import datetime > import numpy as np > import pandas as pd > num_rows = 2 > num_columns = 100 > data = np.zeros((num_rows, num_columns)) > columns = map(str, range(num_columns)) > raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) > a = datetime.datetime.now() > raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in > raw.columns}), spark) > b = datetime.datetime.now() > raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in > raw.columns}), spark) > c = datetime.datetime.now() > raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in > raw.columns}), spark) > d = datetime.datetime.now() > raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in > raw.columns}), spark) > e = datetime.datetime.now() > raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in > raw.columns}), spark) > f = datetime.datetime.now() > raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in > raw.columns}), spark) > g = datetime.datetime.now() > g-a > datetime.timedelta(microseconds=632116) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40087) Support multiple Column drop in R
[ https://issues.apache.org/jira/browse/SPARK-40087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40087: Docs Text: Support for SparkR to drop multiple "Column" > Support multiple Column drop in R > - > > Key: SPARK-40087 > URL: https://issues.apache.org/jira/browse/SPARK-40087 > Project: Spark > Issue Type: New Feature > Components: R >Affects Versions: 3.3.0 >Reporter: Santosh Pingale >Assignee: Santosh Pingale >Priority: Minor > Fix For: 3.4.0 > > > This is a followup on SPARK-39895. The PR previously attempted to adjust > implementation for R as well to match signatures but that part was removed > and we only focused on getting python implementation to behave correctly. > *{{Change supports following operations:}}* > {{df <- select(read.json(jsonPath), "name", "age")}} > {{df$age2 <- df$age}} > {{df1 <- drop(df, df$age, df$name)}} > {{expect_equal(columns(df1), c("age2"))}} > {{df1 <- drop(df, df$age, column("random"))}} > {{expect_equal(columns(df1), c("name", "age2"))}} > {{df1 <- drop(df, df$age, df$name)}} > {{expect_equal(columns(df1), c("age2"))}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39895) pyspark drop doesn't accept *cols
[ https://issues.apache.org/jira/browse/SPARK-39895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-39895: Docs Text: Support for PySpark to drop multiple "Column" > pyspark drop doesn't accept *cols > -- > > Key: SPARK-39895 > URL: https://issues.apache.org/jira/browse/SPARK-39895 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.3, 3.3.0, 3.2.2 >Reporter: Santosh Pingale >Assignee: Santosh Pingale >Priority: Minor > Fix For: 3.4.0 > > > Pyspark dataframe drop has following signature: > {color:#4c9aff}{{def drop(self, *cols: "ColumnOrName") -> > "DataFrame":}}{color} > However when we try to pass multiple Column types to drop function it raises > TypeError > {{each col in the param list should be a string}} > *Minimal reproducible example:* > {color:#4c9aff}values = [("id_1", 5, 9), ("id_2", 5, 1), ("id_3", 4, 3), > ("id_1", 3, 3), ("id_2", 4, 3)]{color} > {color:#4c9aff}df = spark.createDataFrame(values, "id string, point int, > count int"){color} > |– id: string (nullable = true)| > |– point: integer (nullable = true)| > |– count: integer (nullable = true)| > {color:#4c9aff}{{df.drop(df.point, df.count)}}{color} > {quote}{color:#505f79}/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py > in drop(self, *cols){color} > {color:#505f79}2537 for col in cols:{color} > {color:#505f79}2538 if not isinstance(col, str):{color} > {color:#505f79}-> 2539 raise TypeError("each col in the param list should be > a string"){color} > {color:#505f79}2540 jdf = self._jdf.drop(self._jseq(cols)){color} > {color:#505f79}2541{color} > {color:#505f79}TypeError: each col in the param list should be a string{color} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40311) Introduce withColumnsRenamed
[ https://issues.apache.org/jira/browse/SPARK-40311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40311: Description: Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. Issues are faced when users iteratively perform `withColumnRenamed`. * When it works, we see slower performace * In some cases, StackOverflowError is raised due to logical plan being too big * In a few cases, driver died due to memory consumption Some reproducible benchmarks: {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) g = datetime.datetime.now() g-a datetime.timedelta(microseconds=632116) {code} was: Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. This is mostly a performance related optimisations where users iteratively perform `withColumnRenamed`. With 100s columns and multiple iterations, there are cases where either driver will blow up or users will receive a StackOverflowError. {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}),
[jira] [Updated] (SPARK-40311) Introduce withColumnsRenamed
[ https://issues.apache.org/jira/browse/SPARK-40311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40311: Description: Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. This is mostly a performance related optimisations where users iteratively perform `withColumnRenamed`. With 100s columns and multiple iterations, there are cases where either driver will blow up or users will receive a StackOverflowError. {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) g = datetime.datetime.now() g-a datetime.timedelta(microseconds=632116) {code} was: Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. This is mostly a performance related optimisations where users iteratively perform `withColumnRenamed`. With 100s columns and multiple iterations, there are cases where either driver will blow up or users will receive a StackOverflowError. {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() {col, f"prefix_{col}" for col in raw.columns} for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in
[jira] [Updated] (SPARK-40311) Introduce withColumnsRenamed
[ https://issues.apache.org/jira/browse/SPARK-40311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40311: Description: Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. This is mostly a performance related optimisations where users iteratively perform `withColumnRenamed`. With 100s columns and multiple iterations, there are cases where either driver will blow up or users will receive a StackOverflowError. {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() {col, f"prefix_{col}" for col in raw.columns} for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) g = datetime.datetime.now() g-a datetime.timedelta(microseconds=632116) {code} was: Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. This is mostly a performance related optimisations where users iteratively perform `withColumnRenamed`. With 100s columns and multiple iterations, there are cases where either driver will blow up or users will receive a StackOverflowError. {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() {col, f"prefix_{col}" for col in raw.columns} for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw =
[jira] [Created] (SPARK-40311) Introduce withColumnsRenamed
Santosh Pingale created SPARK-40311: --- Summary: Introduce withColumnsRenamed Key: SPARK-40311 URL: https://issues.apache.org/jira/browse/SPARK-40311 Project: Spark Issue Type: Improvement Components: PySpark, SparkR, SQL Affects Versions: 3.2.2, 3.3.0, 3.1.3, 3.0.3 Reporter: Santosh Pingale Add a scala, pyspark, R dataframe API that can rename multiple columns in a single command. This is mostly a performance related optimisations where users iteratively perform `withColumnRenamed`. With 100s columns and multiple iterations, there are cases where either driver will blow up or users will receive a StackOverflowError. {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() {col, f"prefix_{col}" for col in raw.columns} for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") b = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") c = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") d = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") e = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") f = datetime.datetime.now() for col in raw.columns: raw = raw.withColumnRenamed(col, f"prefix_{col}") g = datetime.datetime.now() g-a datetime.timedelta(seconds=12, microseconds=480021) {code} {code:java} import datetime import numpy as np import pandas as pd num_rows = 2 num_columns = 100 data = np.zeros((num_rows, num_columns)) columns = map(str, range(num_columns)) raw = spark.createDataFrame(pd.DataFrame(data, columns=columns)) a = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) b = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) c = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) d = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) e = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) f = datetime.datetime.now() raw = DataFrame(raw._jdf.withColumnsRenamed({col: f"prefix_{col}" for col in raw.columns}), spark) g = datetime.datetime.now() g-a datetime.timedelta(microseconds=632116) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39895) pyspark drop doesn't accept *cols
[ https://issues.apache.org/jira/browse/SPARK-39895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598559#comment-17598559 ] Santosh Pingale commented on SPARK-39895: - I am not sure I understand your confusion. > pyspark drop doesn't accept *cols > -- > > Key: SPARK-39895 > URL: https://issues.apache.org/jira/browse/SPARK-39895 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.3, 3.3.0, 3.2.2 >Reporter: Santosh Pingale >Assignee: Santosh Pingale >Priority: Minor > Fix For: 3.4.0 > > > Pyspark dataframe drop has following signature: > {color:#4c9aff}{{def drop(self, *cols: "ColumnOrName") -> > "DataFrame":}}{color} > However when we try to pass multiple Column types to drop function it raises > TypeError > {{each col in the param list should be a string}} > *Minimal reproducible example:* > {color:#4c9aff}values = [("id_1", 5, 9), ("id_2", 5, 1), ("id_3", 4, 3), > ("id_1", 3, 3), ("id_2", 4, 3)]{color} > {color:#4c9aff}df = spark.createDataFrame(values, "id string, point int, > count int"){color} > |– id: string (nullable = true)| > |– point: integer (nullable = true)| > |– count: integer (nullable = true)| > {color:#4c9aff}{{df.drop(df.point, df.count)}}{color} > {quote}{color:#505f79}/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py > in drop(self, *cols){color} > {color:#505f79}2537 for col in cols:{color} > {color:#505f79}2538 if not isinstance(col, str):{color} > {color:#505f79}-> 2539 raise TypeError("each col in the param list should be > a string"){color} > {color:#505f79}2540 jdf = self._jdf.drop(self._jseq(cols)){color} > {color:#505f79}2541{color} > {color:#505f79}TypeError: each col in the param list should be a string{color} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21487) WebUI-Executors Page results in "Request is a replay (34) attack"
[ https://issues.apache.org/jira/browse/SPARK-21487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17581351#comment-17581351 ] Santosh Pingale commented on SPARK-21487: - I believe this still an issue with kerberised hadoop clusters and sometimes, a major one when you have to debug something. This issue is present in all the secured hadoop clusters I have worked with. Finally at current org, I managed to get it working by patching spark internally. *Whats happening:* At some point, spark UI started sending mustache templates over AJAX call for rendering to improve performance. Those files have `.html` extension, here the yarn applies filter twice! *What could be done:* While yarn is causing the issue here, spark can also make itself agnostic to this issue and allow users to use spark UI in its true form. In reality, mustache files should be `.mustache` instead of `.html`. This change alone allows us to render templates properly. I have tested it to work on local and on cluster. I can raise a PR and maybe we can discuss this over there. > WebUI-Executors Page results in "Request is a replay (34) attack" > - > > Key: SPARK-21487 > URL: https://issues.apache.org/jira/browse/SPARK-21487 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.1.1 >Reporter: ShuMing Li >Priority: Minor > > We upgraded Spark version from 2.0.2 to 2.1.1 recently, WebUI `Executors > Page` becomed empty, with the exception below. > `Executor Page` rendering using javascript language rather than scala in > 2.1.1, but I don't know why causes this result? > "two queries are submitted at the same time and have the same timestamp may > cause this result", but I'm not sure? > ResouceManager log: > {code:java} > 2017-07-20 20:39:09,371 WARN > org.apache.hadoop.security.authentication.server.AuthenticationFilter: > Authentication exception: GSSException: Failure unspecified at GSS-API level > (Mechanism level: Request is a replay (34)) > {code} > Safari explorer console > {code:java} > Failed to load resource: the server responded with a status of 403 > (GSSException: Failure unspecified at GSS-API level (Mechanism level: Request > is a replay > (34)))http://hadoop-rm-host:8088/proxy/application_1494564992156_2751285/static/executorspage-template.html > {code} > Related Links: > https://issues.apache.org/jira/browse/HIVE-12481 > https://issues.apache.org/jira/browse/HADOOP-8830 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40087) Support multiple Column drop in R
[ https://issues.apache.org/jira/browse/SPARK-40087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Santosh Pingale updated SPARK-40087: Description: This is a followup on SPARK-39895. The PR previously attempted to adjust implementation for R as well to match signatures but that part was removed and we only focused on getting python implementation to behave correctly. *{{Change supports following operations:}}* {{df <- select(read.json(jsonPath), "name", "age")}} {{df$age2 <- df$age}} {{df1 <- drop(df, df$age, df$name)}} {{expect_equal(columns(df1), c("age2"))}} {{df1 <- drop(df, df$age, column("random"))}} {{expect_equal(columns(df1), c("name", "age2"))}} {{df1 <- drop(df, df$age, df$name)}} {{expect_equal(columns(df1), c("age2"))}} was: This is a followup on SPARK-39895. The PR previously attempted to adjust implementation for R as well to match signatures but that part was removed and we only focused on getting python implementation to behave correctly. *{{Change supports following operations:}}* {{df <- select(read.json(jsonPath), "name", "age")}} {{df$age2 <- df$age}} {{df1 <- drop(df, df$age, df$name)}} {{expect_equal(columns(df1), c("age2"))}} {{df1 <- drop(df, list(df$age, column("random")))}} {{expect_equal(columns(df1), c("name", "age2"))}} {{df1 <- drop(df, list(df$age, df$name))}} {{expect_equal(columns(df1), c("age2"))}} > Support multiple Column drop in R > - > > Key: SPARK-40087 > URL: https://issues.apache.org/jira/browse/SPARK-40087 > Project: Spark > Issue Type: New Feature > Components: R >Affects Versions: 3.3.0 >Reporter: Santosh Pingale >Priority: Minor > > This is a followup on SPARK-39895. The PR previously attempted to adjust > implementation for R as well to match signatures but that part was removed > and we only focused on getting python implementation to behave correctly. > *{{Change supports following operations:}}* > {{df <- select(read.json(jsonPath), "name", "age")}} > {{df$age2 <- df$age}} > {{df1 <- drop(df, df$age, df$name)}} > {{expect_equal(columns(df1), c("age2"))}} > {{df1 <- drop(df, df$age, column("random"))}} > {{expect_equal(columns(df1), c("name", "age2"))}} > {{df1 <- drop(df, df$age, df$name)}} > {{expect_equal(columns(df1), c("age2"))}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40119) Add reason for cancelJobGroup
Santosh Pingale created SPARK-40119: --- Summary: Add reason for cancelJobGroup Key: SPARK-40119 URL: https://issues.apache.org/jira/browse/SPARK-40119 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.0 Reporter: Santosh Pingale Currently, `cancelJob` supports passing the reason for failure. We use `cancelJobGroup` in a few cases of async actions. It would be great to pass reason of cancellation to the job group. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40087) Support multiple Column drop in R
Santosh Pingale created SPARK-40087: --- Summary: Support multiple Column drop in R Key: SPARK-40087 URL: https://issues.apache.org/jira/browse/SPARK-40087 Project: Spark Issue Type: New Feature Components: R Affects Versions: 3.3.0 Reporter: Santosh Pingale This is a followup on SPARK-39895. The PR previously attempted to adjust implementation for R as well to match signatures but that part was removed and we only focused on getting python implementation to behave correctly. *{{Change supports following operations:}}* {{df <- select(read.json(jsonPath), "name", "age")}} {{df$age2 <- df$age}} {{df1 <- drop(df, df$age, df$name)}} {{expect_equal(columns(df1), c("age2"))}} {{df1 <- drop(df, list(df$age, column("random")))}} {{expect_equal(columns(df1), c("name", "age2"))}} {{df1 <- drop(df, list(df$age, df$name))}} {{expect_equal(columns(df1), c("age2"))}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39895) pyspark drop doesn't accept *cols
Santosh Pingale created SPARK-39895: --- Summary: pyspark drop doesn't accept *cols Key: SPARK-39895 URL: https://issues.apache.org/jira/browse/SPARK-39895 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.2.2, 3.3.0, 3.0.3 Reporter: Santosh Pingale Pyspark dataframe drop has following signature: {color:#4c9aff}{{def drop(self, *cols: "ColumnOrName") -> "DataFrame":}}{color} However when we try to pass multiple Column types to drop function it raises TypeError {{each col in the param list should be a string}} *Minimal reproducible example:* {color:#4c9aff}values = [("id_1", 5, 9), ("id_2", 5, 1), ("id_3", 4, 3), ("id_1", 3, 3), ("id_2", 4, 3)]{color} {color:#4c9aff}df = spark.createDataFrame(values, "id string, point int, count int"){color} |– id: string (nullable = true)| |– point: integer (nullable = true)| |– count: integer (nullable = true)| {color:#4c9aff}{{df.drop(df.point, df.count)}}{color} {quote}{color:#505f79}/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py in drop(self, *cols){color} {color:#505f79}2537 for col in cols:{color} {color:#505f79}2538 if not isinstance(col, str):{color} {color:#505f79}-> 2539 raise TypeError("each col in the param list should be a string"){color} {color:#505f79}2540 jdf = self._jdf.drop(self._jseq(cols)){color} {color:#505f79}2541{color} {color:#505f79}TypeError: each col in the param list should be a string{color} {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org