Executor tab missing information

2023-02-13 Thread Prem Sahoo
Hello All,
I am executing spark jobs but in executor tab I am missing information, I
cant see any data/info coming up. Please let me know what I am missing .


Re: Executor metrics are missing on Prometheus sink

2023-02-13 Thread Qian Sun
Hi Luca,

Thanks for your reply, which is very helpful for me :)

I am trying other metrics sinks with cAdvisor to see the effect. If it
works well, I will share it with the community.

On Fri, Feb 10, 2023 at 4:26 PM Luca Canali  wrote:

> Hi Qian,
>
>
>
> Indeed the metrics available with the Prometheus servlet sink (which is
> marked still as experimental) are limited, compared to the full
> instrumentation, and this is due to the way it is implemented with a
> servlet and cannot be easily extended from what I can see.
>
> You can use another supported metrics sink (see
> https://spark.apache.org/docs/latest/monitoring.html#metrics ) if you
> want to collect all the metrics are exposed by Spark executors.
>
> For example, I use the graphite sink and then collect metrics into an
> InfluxDB instance (see https://github.com/cerndb/spark-dashboard )
>
> An additional comment is that there is room for having more sinks
> available for Apache Spark metrics, notably for InfluxDB and for Prometheus
> (gateway), if someone is interested in working on that.
>
>
>
> Best,
>
> Luca
>
>
>
>
>
> *From:* Qian Sun 
> *Sent:* Friday, February 10, 2023 05:05
> *To:* dev ; user.spark 
> *Subject:* Executor metrics are missing on prometheus sink
>
>
>
> Setting up prometheus sink in this way:
>
> -c spark.ui.prometheus.enabled=true
>
> -c spark.executor.processTreeMetrics.enabled=true
>
> -c spark.metrics.conf=/spark/conf/metric.properties
>
> *metric.properties:*{}
>
> *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
>
> *.sink.prometheusServlet.path=/metrics/prometheus
>
> Result:
>
> Both of these endpoints have some metrics
>
> :4040/metrics/prometheus
>
> :4040/metrics/executors/prometheus
>
>
>
> But the executor one misses metrics under the executor namespace
> described here:
>
>
> https://spark.apache.org/docs/latest/monitoring.html#component-instance--executor
>
>
>
> *How to expose executor metrics on spark exeuctors pod?*
>
>
>
> *Any help will be appreciated.*
>
> --
>
> Regards,
>
> Qian Sun
>


-- 
Regards,
Qian Sun


Running Spark on Kubernetes (GKE) - failing on spark-submit

2023-02-13 Thread karan alang
Hello All,

I'm trying to run a simple application on GKE (Kubernetes), and it is
failing:
Note : I have spark(bitnami spark chart) installed on GKE using helm
install

Here is what is done :
1. created a docker image using Dockerfile

Dockerfile :
```

FROM python:3.7-slim

RUN apt-get update && \
apt-get install -y default-jre && \
apt-get install -y openjdk-11-jre-headless && \
apt-get clean

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64

RUN pip install pyspark
RUN mkdir -p /myexample && chmod 755 /myexample
WORKDIR /myexample

COPY src/StructuredStream-on-gke.py /myexample/StructuredStream-on-gke.py

CMD ["pyspark"]

```
Simple pyspark application :
```

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StructuredStreaming-on-gke").getOrCreate()

data = [('k1', 123000), ('k2', 234000), ('k3', 456000)]
df = spark.createDataFrame(data, ('id', 'salary'))

df.show(5, False)

```

Spark-submit command :
```

spark-submit --master k8s://https://34.74.22.140:7077 --deploy-mode cluster
--name pyspark-example --conf
spark.kubernetes.container.image=pyspark-example:0.1 --conf
spark.kubernetes.file.upload.path=/myexample src/StructuredStream-on-gke.py
```

Error i get :
```

23/02/13 13:18:27 INFO KubernetesUtils: Uploading file:
/Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py
to dest:
/myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a/StructuredStream-on-gke.py...

Exception in thread "main" org.apache.spark.SparkException: Uploading file
/Users/karanalang/PycharmProjects/Kafka/pyspark-docker/src/StructuredStream-on-gke.py
failed...

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:296)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.renameMainAppResource(KubernetesUtils.scala:270)

at
org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configureForPython(DriverCommandFeatureStep.scala:109)

at
org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configurePod(DriverCommandFeatureStep.scala:44)

at
org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:59)

at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)

at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)

at scala.collection.immutable.List.foldLeft(List.scala:89)

at
org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)

at
org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207)

at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2622)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207)

at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: org.apache.spark.SparkException: Error uploading file
StructuredStream-on-gke.py

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:319)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:292)

... 21 more

Caused by: java.io.IOException: Mkdirs failed to create
/myexample/spark-upload-12228079-d652-4bf3-b907-3810d275124a

at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:317)

at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:305)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)

at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:414)

at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:387)

at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:2369)

at
org.apache.hadoop.fs.FilterFileSystem.copyFromLocalFile(FilterFileSystem.java:368)

at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileToHadoopCompatibleFS(KubernetesUtils.scala:316)

... 22 more
```

Any ideas on how to fix this & get it to work ?
tia !

Pls see the stackoverflow link :

https://stackoverflow.com/questions/75441360/running-spark-application-on-gke-failing-on-spark-submit


[Spark Core] Spark data loss/data duplication when executors die

2023-02-13 Thread Erik Eklund
Hi,

We are facing this issue when we convert RDD -> Dataset followed by repartition 
+ write. We are using spot instances on k8s which means they can die at any 
moment. And when they do during this phase, we very often see data duplication 
happening.

Pseudo job code:

val rdd = data.map(…)
val ds = spark.createDataset(rdd, classEncoder)
.repartition(N)
.write
.format(“parquet”)
.mode(“overwrite”)
.save(path)

If I kill an executor pod during the repartition stage I can reproduce the 
issue. If I instead move the repartition to happen on the rdd instead of the 
dataset I cannot reproduce the issue.

Is this a bug in spark lineage when going from rdd -> ds/df -> repartition when 
an executor drops? There is no randomness in the map function on the rdd before 
you ask 😊

Thanks,
Erik


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-13 Thread sam smith
Alright, this is the working Java version of it:

List listCols = new ArrayList();
> Arrays.asList(dataset.columns()).forEach(column -> {
> listCols.add(org.apache.spark.sql.functions.collect_set(column)); });
> Column[] arrCols = listCols.toArray(new Column[listCols.size()]);
> dataset = dataset.select(arrCols);


But then, I tried to explode the set of values into rows, through the
explode() but the column values repeat to fill the size of the largest
column.

How to set the repeated values to null instead? (thus keeping only one
exploded set of column values in each column).

Thanks.

Le dim. 12 févr. 2023 à 22:43, Enrico Minack  a
écrit :

> @Sean: This aggregate function does work without an explicit groupBy():
>
> ./spark-3.3.1-bin-hadoop2/bin/spark-shell
> Spark context Web UI available at http://*:4040
> Spark context available as 'sc' (master = local[*], app id =
> local-1676237726079).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.3.1
>   /_/
>
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4,
> 10, "one")).toDF("a", "b", "c")
> scala> df.select(df.columns.map(column =>
> collect_set(col(column)).as(column)): _*).show()
> +++--+
>
> |   a|   b| c|
> +++--+
> |[1, 2, 3, 4]|[20, 10]|[one, two]|
> +++--+
>
> @Sam: I haven't tested the Java code, sorry. I presume you can work it out
> from the working Scala code.
>
> Enrico
>
>
> Am 12.02.23 um 21:32 schrieb Sean Owen:
>
> It doesn't work because it's an aggregate function. You have to groupBy()
> (group by nothing) to make that work, but, you can't assign that as a
> column. Folks those approaches don't make sense semantically in SQL or
> Spark or anything.
> They just mean use threads to collect() distinct values for each col in
> parallel using threads in your program. You don't have to but you could.
> What else are we looking for here, the answer has been given a number of
> times I think.
>
>
> On Sun, Feb 12, 2023 at 2:28 PM sam smith 
> wrote:
>
>> OK, what do you mean by " do your outer for loop in parallel "?
>> btw this didn't work:
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> collect_set(col(columnName)).as(columnName));
>> }
>>
>>
>> Le dim. 12 févr. 2023 à 20:36, Enrico Minack  a
>> écrit :
>>
>>> That is unfortunate, but 3.4.0 is around the corner, really!
>>>
>>> Well, then based on your code, I'd suggest two improvements:
>>> - cache your dataframe after reading, this way, you don't read the
>>> entire file for each column
>>> - do your outer for loop in parallel, then you have N parallel Spark
>>> jobs (only helps if your Spark cluster is not fully occupied by a single
>>> column)
>>>
>>> Your withColumn-approach does not work because withColumn expects a
>>> column as the second argument, but df.select(columnName).distinct() is a
>>> DataFrame and .col is a column in *that* DataFrame, it is not a column
>>> of the dataframe that you call withColumn on.
>>>
>>> It should read:
>>>
>>> Scala:
>>> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
>>> _*).show()
>>>
>>> Java:
>>> for (String columnName : df.columns()) {
>>> df= df.withColumn(columnName,
>>> collect_set(col(columnName)).as(columnName));
>>> }
>>>
>>> Then you have a single DataFrame that computes all columns in a single
>>> Spark job.
>>>
>>> But this reads all distinct values into a single partition, which has
>>> the same downside as collect, so this is as bad as using collect.
>>>
>>> Cheers,
>>> Enrico
>>>
>>>
>>> Am 12.02.23 um 18:05 schrieb sam smith:
>>>
>>> @Enrico Minack  Thanks for "unpivot" but I am
>>> using version 3.3.0 (you are taking it way too far as usual :) )
>>> @Sean Owen  Pls then show me how it can be improved
>>> by code.
>>>
>>> Also, why such an approach (using withColumn() ) doesn't work:
>>>
>>> for (String columnName : df.columns()) {
>>> df= df.withColumn(columnName,
>>> df.select(columnName).distinct().col(columnName));
>>> }
>>>
>>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
>>> écrit :
>>>
 You could do the entire thing in DataFrame world and write the result
 to disk. All you need is unpivot (to be released in Spark 3.4.0, soon).

 Note this is Scala but should be straightforward to translate into Java:

 import org.apache.spark.sql.functions.collect_set

 val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
 123)).toDF("a", "b", "c")

 df.unpivot(Array.empty, "column", "value")
   .groupBy("column")
   .agg(collect_set("value").as("distinct_values"))

 The unpivot opera