Executor tab missing information
Hello All, I am executing spark jobs but in executor tab I am missing information, I cant see any data/info coming up. Please let me know what I am missing .
Re: Executor metrics are missing on Prometheus sink
Hi Luca, Thanks for your reply, which is very helpful for me :) I am trying other metrics sinks with cAdvisor to see the effect. If it works well, I will share it with the community. On Fri, Feb 10, 2023 at 4:26 PM Luca Canali wrote: > Hi Qian, > > > > Indeed the metrics available with the Prometheus servlet sink (which is > marked still as experimental) are limited, compared to the full > instrumentation, and this is due to the way it is implemented with a > servlet and cannot be easily extended from what I can see. > > You can use another supported metrics sink (see > https://spark.apache.org/docs/latest/monitoring.html#metrics ) if you > want to collect all the metrics are exposed by Spark executors. > > For example, I use the graphite sink and then collect metrics into an > InfluxDB instance (see https://github.com/cerndb/spark-dashboard ) > > An additional comment is that there is room for having more sinks > available for Apache Spark metrics, notably for InfluxDB and for Prometheus > (gateway), if someone is interested in working on that. > > > > Best, > > Luca > > > > > > *From:* Qian Sun > *Sent:* Friday, February 10, 2023 05:05 > *To:* dev ; user.spark > *Subject:* Executor metrics are missing on prometheus sink > > > > Setting up prometheus sink in this way: > > -c spark.ui.prometheus.enabled=true > > -c spark.executor.processTreeMetrics.enabled=true > > -c spark.metrics.conf=/spark/conf/metric.properties > > *metric.properties:*{} > > *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet > > *.sink.prometheusServlet.path=/metrics/prometheus > > Result: > > Both of these endpoints have some metrics > > :4040/metrics/prometheus > > :4040/metrics/executors/prometheus > > > > But the executor one misses metrics under the executor namespace > described here: > > > https://spark.apache.org/docs/latest/monitoring.html#component-instance--executor > > > > *How to expose executor metrics on spark exeuctors pod?* > > > > *Any help will be appreciated.* > > -- > > Regards, > > Qian Sun > -- Regards, Qian Sun
Running Spark on Kubernetes (GKE) - failing on spark-submit
Hello All, I'm trying to run a simple application on GKE (Kubernetes), and it is failing: Note : I have spark(bitnami spark chart) installed on GKE using helm install Here is what is done : 1. created a docker image using Dockerfile Dockerfile : ``` FROM python:3.7-slim RUN apt-get update && \ apt-get install -y default-jre && \ apt-get install -y openjdk-11-jre-headless && \ apt-get clean ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 RUN pip install pyspark RUN mkdir -p /myexample && chmod 755 /myexample WORKDIR /myexample COPY src/StructuredStream-on-gke.py /myexample/StructuredStream-on-gke.py CMD ["pyspark"] ``` Simple pyspark application : ``` from pyspark.sql import SparkSession spark = SparkSession.builder.appName("StructuredStreaming-on-gke").getOrCreate() data = [('k1', 123000), ('k2', 234000), ('k3', 456000)] df = spark.createDataFrame(data, ('id', 'salary')) df.show(5, False) ``` Spark-submit command : ``` spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode cluster --name pyspark-example --conf spark.kubernetes.container.image=pyspark-example:0.1 --conf spark.kubernetes.file.upload.path=/myexample src/StructuredStream-on-gke.py ``` Error i get : ``` 23/02/13 13:18:27 INFO KubernetesUtils: Uploading file: /Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py to dest: /myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a/StructuredStream-on-gke.py... Exception in thread "main" org.apache.spark.SparkException: Uploading file /Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py failed... at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:296) at org.apache.spark.deploy.k8s.KubernetesUtils$.renameMainAppResource(KubernetesUtils.scala:270) at org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configureForPython(DriverCommandFeatureStep.scala:109) at org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configurePod(DriverCommandFeatureStep.scala:44) at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:59) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:89) at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58) at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207) at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179) at org.apache.spark.deploy.SparkSubmit.org $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.SparkException: Error uploading file StructuredStream-on-gke.py at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:319) at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:292) ... 21 more Caused by: java.io.IOException: Mkdirs failed to create /myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:317) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:305) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:414) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:387) at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2369) at org.apache.hadoop.fs.FilterFileSystem.copyFromLocalFile(FilterFileSystem.java:368) at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:316) ... 22 more ``` Any ideas on how to fix this & get it to work ? tia ! Pls see the stackoverflow link : https://stackoverflow.com/questions/75441360/running-spark-application-on-gke-failing-on-spark-submit
[Spark Core] Spark data loss/data duplication when executors die
Hi, We are facing this issue when we convert RDD -> Dataset followed by repartition + write. We are using spot instances on k8s which means they can die at any moment. And when they do during this phase, we very often see data duplication happening. Pseudo job code: val rdd = data.map(…) val ds = spark.createDataset(rdd, classEncoder) .repartition(N) .write .format(“parquet”) .mode(“overwrite”) .save(path) If I kill an executor pod during the repartition stage I can reproduce the issue. If I instead move the repartition to happen on the rdd instead of the dataset I cannot reproduce the issue. Is this a bug in spark lineage when going from rdd -> ds/df -> repartition when an executor drops? There is no randomness in the map function on the rdd before you ask 😊 Thanks, Erik
Re: How to improve efficiency of this piece of code (returning distinct column values)
Alright, this is the working Java version of it: List listCols = new ArrayList(); > Arrays.asList(dataset.columns()).forEach(column -> { > listCols.add(org.apache.spark.sql.functions.collect_set(column)); }); > Column[] arrCols = listCols.toArray(new Column[listCols.size()]); > dataset = dataset.select(arrCols); But then, I tried to explode the set of values into rows, through the explode() but the column values repeat to fill the size of the largest column. How to set the repeated values to null instead? (thus keeping only one exploded set of column values in each column). Thanks. Le dim. 12 févr. 2023 à 22:43, Enrico Minack a écrit : > @Sean: This aggregate function does work without an explicit groupBy(): > > ./spark-3.3.1-bin-hadoop2/bin/spark-shell > Spark context Web UI available at http://*:4040 > Spark context available as 'sc' (master = local[*], app id = > local-1676237726079). > Spark session available as 'spark'. > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 3.3.1 > /_/ > > Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17) > Type in expressions to have them evaluated. > Type :help for more information. > > scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, > 10, "one")).toDF("a", "b", "c") > scala> df.select(df.columns.map(column => > collect_set(col(column)).as(column)): _*).show() > +++--+ > > | a| b| c| > +++--+ > |[1, 2, 3, 4]|[20, 10]|[one, two]| > +++--+ > > @Sam: I haven't tested the Java code, sorry. I presume you can work it out > from the working Scala code. > > Enrico > > > Am 12.02.23 um 21:32 schrieb Sean Owen: > > It doesn't work because it's an aggregate function. You have to groupBy() > (group by nothing) to make that work, but, you can't assign that as a > column. Folks those approaches don't make sense semantically in SQL or > Spark or anything. > They just mean use threads to collect() distinct values for each col in > parallel using threads in your program. You don't have to but you could. > What else are we looking for here, the answer has been given a number of > times I think. > > > On Sun, Feb 12, 2023 at 2:28 PM sam smith > wrote: > >> OK, what do you mean by " do your outer for loop in parallel "? >> btw this didn't work: >> for (String columnName : df.columns()) { >> df= df.withColumn(columnName, >> collect_set(col(columnName)).as(columnName)); >> } >> >> >> Le dim. 12 févr. 2023 à 20:36, Enrico Minack a >> écrit : >> >>> That is unfortunate, but 3.4.0 is around the corner, really! >>> >>> Well, then based on your code, I'd suggest two improvements: >>> - cache your dataframe after reading, this way, you don't read the >>> entire file for each column >>> - do your outer for loop in parallel, then you have N parallel Spark >>> jobs (only helps if your Spark cluster is not fully occupied by a single >>> column) >>> >>> Your withColumn-approach does not work because withColumn expects a >>> column as the second argument, but df.select(columnName).distinct() is a >>> DataFrame and .col is a column in *that* DataFrame, it is not a column >>> of the dataframe that you call withColumn on. >>> >>> It should read: >>> >>> Scala: >>> df.select(df.columns.map(column => collect_set(col(column)).as(column)): >>> _*).show() >>> >>> Java: >>> for (String columnName : df.columns()) { >>> df= df.withColumn(columnName, >>> collect_set(col(columnName)).as(columnName)); >>> } >>> >>> Then you have a single DataFrame that computes all columns in a single >>> Spark job. >>> >>> But this reads all distinct values into a single partition, which has >>> the same downside as collect, so this is as bad as using collect. >>> >>> Cheers, >>> Enrico >>> >>> >>> Am 12.02.23 um 18:05 schrieb sam smith: >>> >>> @Enrico Minack Thanks for "unpivot" but I am >>> using version 3.3.0 (you are taking it way too far as usual :) ) >>> @Sean Owen Pls then show me how it can be improved >>> by code. >>> >>> Also, why such an approach (using withColumn() ) doesn't work: >>> >>> for (String columnName : df.columns()) { >>> df= df.withColumn(columnName, >>> df.select(columnName).distinct().col(columnName)); >>> } >>> >>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack a >>> écrit : >>> You could do the entire thing in DataFrame world and write the result to disk. All you need is unpivot (to be released in Spark 3.4.0, soon). Note this is Scala but should be straightforward to translate into Java: import org.apache.spark.sql.functions.collect_set val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, 123)).toDF("a", "b", "c") df.unpivot(Array.empty, "column", "value") .groupBy("column") .agg(collect_set("value").as("distinct_values")) The unpivot opera