RE: Spark on Java 17

2023-12-09 Thread Luca Canali
Hi Faiz,

We find G1GC works well for some of our workloads that are Parquet-read 
intensive and we have been using G1GC with Spark on Java 8 already 
(spark.driver.extraJavaOptions and spark.executor.extraJavaOptions= 
“-XX:+UseG1GC”), while currently we are mostly running Spark (3.3 and higher) 
on Java 11.
However, the best is always to refer to measurements of your specific 
workloads, let me know if you find something different.
BTW besides the WebUI, I typically measure GC time also with a couple of custom 
tools: https://github.com/cerndb/spark-dashboard and  
https://github.com/LucaCanali/sparkMeasure
A few tests of microbenchmarking Spark reading Parquet with a few different 
JDKs at: https://db-blog.web.cern.ch/node/192

Best,
Luca


From: Faiz Halde 
Sent: Thursday, December 7, 2023 23:25
To: user@spark.apache.org
Subject: Spark on Java 17

Hello,

We are planning to switch to Java 17 for Spark and were wondering if there's 
any obvious learnings from anybody related to JVM tuning?

We've been running on Java 8 for a while now and used to use the parallel GC as 
that used to be a general recommendation for high throughout systems. How has 
the default G1GC worked out with Spark?

Thanks
Faiz


RE: Profiling PySpark Pandas UDF

2022-08-29 Thread Luca Canali
Hi Abdeali,

 

Thanks for the support. Indeed you can go ahead and test and review  my latest 
PR for SPARK-34265 

(Instrument Python UDF execution using SQL Metrics) if you want to:   
https://github.com/apache/spark/pull/33559



Currently I reduced the scope of the instrumentation to just 3 simple metrics 
to implement: "data sent to Python workers",

"data returned from Python workers", "number of output rows".
In a previous attempt I had also instrumented the time for UDF execution, 
although there are some subtle points there, 

and I may need to go back to testing that at a later stage.



It definitely would be good to know if people using PySpark and Python UDFs 
find this proposed improvement useful.

I see the proposed additional instrumentation as complementary to the 
Python/Pandas UDF Profiler introduced in Spark 3.3.

 

Best,

Luca

 

From: Abdeali Kothari  
Sent: Friday, August 26, 2022 15:59
To: Luca Canali 
Cc: Russell Jurney ; Gourav Sengupta 
; Sean Owen ; Takuya UESHIN 
; user ; Subash Prabanantham 

Subject: Re: Profiling PySpark Pandas UDF

 

Hi Luca, I see you pushed some code to the PR 3 hrs ago.

That's awesome. If I can help out in any way - do let me know

I think that's an amazing feature and would be great if it can get into spark

 

On Fri, 26 Aug 2022, 12:41 Luca Canali, mailto:luca.can...@cern.ch> > wrote:

@Abdeali as for “lightweight profiling”, there is some work in progress on 
instrumenting Python UDFs with Spark metrics, see 
https://issues.apache.org/jira/browse/SPARK-34265  

However it is a bit stuck at the moment, and needs to be revived I believe.  

 

Best,

Luca

 

From: Abdeali Kothari mailto:abdealikoth...@gmail.com> > 
Sent: Friday, August 26, 2022 06:36
To: Subash Prabanantham mailto:subashpraba...@gmail.com> >
Cc: Russell Jurney mailto:russell.jur...@gmail.com> 
>; Gourav Sengupta mailto:gourav.sengu...@gmail.com> >; Sean Owen mailto:sro...@gmail.com> >; Takuya UESHIN mailto:ues...@happy-camper.st> >; user mailto:user@spark.apache.org> >
Subject: Re: Profiling PySpark Pandas UDF

 

The python profiler is pretty cool !

Ill try it out to see what could be taking time within the UDF with it.

 

I'm wondering if there is also some lightweight profiling (which does not slow 
down my processing) for me to get:

 

 - how much time the UDF took (like how much time was spent inside the UDF)

 - how many times the UDF was called 

 

I can see the overall time a stage took in the Spark UI - would be cool if I 
could find the time a UDF takes too

 

On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, mailto:subashpraba...@gmail.com> > wrote:

Wow, lots of good suggestions. I didn’t know about the profiler either. Great 
suggestion @Takuya. 

 

 

Thanks,

Subash

 

On Thu, 25 Aug 2022 at 19:30, Russell Jurney mailto:russell.jur...@gmail.com> > wrote:

YOU know what you're talking about and aren't hacking a solution. You are my 
new friend :) Thank you, this is incredibly helpful!




 

Thanks,

Russell Jurney  <http://twitter.com/rjurney> @rjurney  
<mailto:russell.jur...@gmail.com> russell.jur...@gmail.com  
<http://linkedin.com/in/russelljurney> LI  <http://facebook.com/jurney> FB  
<http://datasyndrome.com> datasyndrome.com

 

 

On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN mailto:ues...@happy-camper.st> > wrote:

Hi Subash,

Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
- 
https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf

Hope it can help you.

Thanks.

 

On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney mailto:russell.jur...@gmail.com> > wrote:

Subash, I’m here to help :)

 

I started a test script to demonstrate a solution last night but got a cold and 
haven’t finished it. Give me another day and I’ll get it to you. My suggestion 
is that you run PySpark locally in pytest with a fixture to generate and yield 
your SparckContext and SparkSession and the. Write tests that load some test 
data, perform some count operation and checkpoint to ensure that data is 
loaded, start a timer, run your UDF on the DataFrame, checkpoint again or write 
some output to disk to make sure it finishes and then stop the timer and 
compute how long it takes. I’ll show you some code, I have to do this for 
Graphlet AI’s RTL utils and other tools to figure out how much overhead there 
is using Pandera and Spark together to validate data: 
https://github.com/Graphlet-AI/graphlet

 

I’ll respond by tomorrow evening with code in a fist! We’ll see if it gets 
consistent, measurable and valid results! :)

 

Russell Jurney

 

On Thu, Aug 25, 2022 at 10:00 AM Sean Owen mailto:sro...@gmail.com> > wrote:

It's important to realize that while pandas UDFs and pandas on Spark are both 
related to pandas, they are not themselves directly rela

RE: Profiling PySpark Pandas UDF

2022-08-26 Thread Luca Canali
@Abdeali as for “lightweight profiling”, there is some work in progress on 
instrumenting Python UDFs with Spark metrics, see 
https://issues.apache.org/jira/browse/SPARK-34265  

However it is a bit stuck at the moment, and needs to be revived I believe.  

 

Best,

Luca

 

From: Abdeali Kothari  
Sent: Friday, August 26, 2022 06:36
To: Subash Prabanantham 
Cc: Russell Jurney ; Gourav Sengupta 
; Sean Owen ; Takuya UESHIN 
; user 
Subject: Re: Profiling PySpark Pandas UDF

 

The python profiler is pretty cool !

Ill try it out to see what could be taking time within the UDF with it.

 

I'm wondering if there is also some lightweight profiling (which does not slow 
down my processing) for me to get:

 

 - how much time the UDF took (like how much time was spent inside the UDF)

 - how many times the UDF was called 

 

I can see the overall time a stage took in the Spark UI - would be cool if I 
could find the time a UDF takes too

 

On Fri, 26 Aug 2022, 00:25 Subash Prabanantham, mailto:subashpraba...@gmail.com> > wrote:

Wow, lots of good suggestions. I didn’t know about the profiler either. Great 
suggestion @Takuya. 

 

 

Thanks,

Subash

 

On Thu, 25 Aug 2022 at 19:30, Russell Jurney mailto:russell.jur...@gmail.com> > wrote:

YOU know what you're talking about and aren't hacking a solution. You are my 
new friend :) Thank you, this is incredibly helpful!




 

Thanks,

Russell Jurney  <http://twitter.com/rjurney> @rjurney  
<mailto:russell.jur...@gmail.com> russell.jur...@gmail.com  
<http://linkedin.com/in/russelljurney> LI  <http://facebook.com/jurney> FB  
<http://datasyndrome.com> datasyndrome.com

 

 

On Thu, Aug 25, 2022 at 10:52 AM Takuya UESHIN mailto:ues...@happy-camper.st> > wrote:

Hi Subash,

Have you tried the Python/Pandas UDF Profiler introduced in Spark 3.3?
- 
https://spark.apache.org/docs/latest/api/python/development/debugging.html#python-pandas-udf

Hope it can help you.

Thanks.

 

On Thu, Aug 25, 2022 at 10:18 AM Russell Jurney mailto:russell.jur...@gmail.com> > wrote:

Subash, I’m here to help :)

 

I started a test script to demonstrate a solution last night but got a cold and 
haven’t finished it. Give me another day and I’ll get it to you. My suggestion 
is that you run PySpark locally in pytest with a fixture to generate and yield 
your SparckContext and SparkSession and the. Write tests that load some test 
data, perform some count operation and checkpoint to ensure that data is 
loaded, start a timer, run your UDF on the DataFrame, checkpoint again or write 
some output to disk to make sure it finishes and then stop the timer and 
compute how long it takes. I’ll show you some code, I have to do this for 
Graphlet AI’s RTL utils and other tools to figure out how much overhead there 
is using Pandera and Spark together to validate data: 
https://github.com/Graphlet-AI/graphlet

 

I’ll respond by tomorrow evening with code in a fist! We’ll see if it gets 
consistent, measurable and valid results! :)

 

Russell Jurney

 

On Thu, Aug 25, 2022 at 10:00 AM Sean Owen mailto:sro...@gmail.com> > wrote:

It's important to realize that while pandas UDFs and pandas on Spark are both 
related to pandas, they are not themselves directly related. The first lets you 
use pandas within Spark, the second lets you use pandas on Spark. 

 

Hard to say with this info but you want to look at whether you are doing 
something expensive in each UDF call and consider amortizing it with the scalar 
iterator UDF pattern. Maybe. 

 

A pandas UDF is not spark code itself so no there is no tool in spark to 
profile it. Conversely any approach to profiling pandas or python would work 
here .

 

On Thu, Aug 25, 2022, 11:22 AM Gourav Sengupta mailto:gourav.sengu...@gmail.com> > wrote:

Hi,

 

May be I am jumping to conclusions and making stupid guesses, but have you 
tried koalas now that it is natively integrated with pyspark??

 

Regards 

Gourav

 

On Thu, 25 Aug 2022, 11:07 Subash Prabanantham, mailto:subashpraba...@gmail.com> > wrote:

Hi All,

 

I was wondering if we have any best practices on using pandas UDF ? Profiling 
UDF is not an easy task and our case requires some drilling down on the logic 
of the function. 

 

 

Our use case:

We are using func(Dataframe) => Dataframe as interface to use Pandas UDF, while 
running locally only the function, it runs faster but when executed in Spark 
environment - the processing time is more than expected. We have one column 
where the value is large (BinaryType -> 600KB), wondering whether this could 
make the Arrow computation slower ? 

 

Is there any profiling or best way to debug the cost incurred using pandas UDF ?

 

 

Thanks,

Subash

 

-- 

 

Thanks,

Russell Jurney  <http://twitter.com/rjurney> @rjurney  
<mailto:russell.jur...@gmail.com> russell.jur...@gmail.com  
<http://linkedin.com/in/russelljurney> LI  <http://facebook.com/jurney> FB  
<http://datasyndrome.com> datasyndrome.com




 

-- 

Takuya UESHIN



[no subject]

2022-02-24 Thread Luca Borin
Unsubscribe


RE: measure running time

2021-12-23 Thread Luca Canali
Hi Mich,

 

With Spark 3.1.1 you need to use spark-measure built with Scala 2.12:  

 

bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17

 

Best,

Luca

 

From: Mich Talebzadeh  
Sent: Thursday, December 23, 2021 19:59
To: Luca Canali 
Cc: user 
Subject: Re: measure running time

 

Hi Luca,

 

Have you tested this link  https://github.com/LucaCanali/sparkMeasure

 

With Spark 3.1.1/PySpark,   I am getting this error 

 

 

pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17

 

:: problems summary ::

 ERRORS

unknown resolver null

 

SERVER ERROR: Bad Gateway 
url=https://dl.bintray.com/spark-packages/maven/com/fasterxml/jackson/jackson-bom/2.9.9/jackson-bom-2.9.9.jar

 

SERVER ERROR: Bad Gateway 
url=https://dl.bintray.com/spark-packages/maven/com/fasterxml/jackson/jackson-base/2.9.9/jackson-base-2.9.9.jar

 

Using Python version 3.7.3 (default, Mar 27 2019 22:11:17)

Spark context Web UI available at http://rhes76:4040

Spark context available as 'sc' (master = local[*], app id = 
local-1640285629478).

SparkSession available as 'spark'.

 

>>> from sparkmeasure import StageMetrics

>>> stagemetrics = StageMetrics(spark)

Traceback (most recent call last):

  File "", line 1, in 

  File 
"/home/hduser/anaconda3/envs/pyspark_venv/lib/python3.7/site-packages/sparkmeasure/stagemetrics.py",
 line 15, in __init__

self.stagemetrics = 
self.sc._jvm.ch.cern.sparkmeasure.StageMetrics(self.sparksession._jsparkSession)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
1569, in __call__

  File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco

return f(*a, **kw)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, 
in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling 
None.ch.cern.sparkmeasure.StageMetrics.

: java.lang.NoClassDefFoundError: scala/Product$class

at ch.cern.sparkmeasure.StageMetrics.(stagemetrics.scala:111)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:238)

at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: scala.Product$class

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 12 more

 

Thanks

 

 

   view my Linkedin profile 
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> 

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 

 

 

 

On Thu, 23 Dec 2021 at 15:41, Luca Canali mailto:luca.can...@cern.ch> > wrote:

Hi,

 

I agree with Gourav that just measuring execution time is a simplistic approach 
that may lead you to miss important details, in particular when running 
distributed computations.

WebUI, REST API, and metrics instrumentation in Spark can be quite useful for 
further drill down. See https://spark.apache.org/docs/latest/monitoring.html

You can also have a look at this tool that takes care of automating collecting 
and aggregating some executor task metrics: 
https://github.com/LucaCanali/sparkMeasure

 

Best,

Luca

 

From: Gourav Sengupta mailto:gourav.sengu...@gmail.com> > 
Sent: Thursday, December 23, 2021 14:23
To: bit...@bitfox.top
Cc: user mailto:user@spark.apache.org> >
Subject: Re: measure running time

 

Hi,

 

I do not think that such time comparisons make any sense at all in distributed 
computation. Just saying that an operation in RDD and Dataframe can be compared 
based on their start and stop time may not provide any valid information.

 

You will have to look into the details of timing and the steps. For example, 
please look at the SPARK UI to see how timings are calculated in distributed 
computing mode, t

RE: measure running time

2021-12-23 Thread Luca Canali
Hi,

 

I agree with Gourav that just measuring execution time is a simplistic approach 
that may lead you to miss important details, in particular when running 
distributed computations.

WebUI, REST API, and metrics instrumentation in Spark can be quite useful for 
further drill down. See https://spark.apache.org/docs/latest/monitoring.html

You can also have a look at this tool that takes care of automating collecting 
and aggregating some executor task metrics: 
https://github.com/LucaCanali/sparkMeasure

 

Best,

Luca

 

From: Gourav Sengupta  
Sent: Thursday, December 23, 2021 14:23
To: bit...@bitfox.top
Cc: user 
Subject: Re: measure running time

 

Hi,

 

I do not think that such time comparisons make any sense at all in distributed 
computation. Just saying that an operation in RDD and Dataframe can be compared 
based on their start and stop time may not provide any valid information.

 

You will have to look into the details of timing and the steps. For example, 
please look at the SPARK UI to see how timings are calculated in distributed 
computing mode, there are several well written papers on this.

 

 

Thanks and Regards,

Gourav Sengupta

 

 

 

 

 

On Thu, Dec 23, 2021 at 10:57 AM mailto:bit...@bitfox.top> 
> wrote:

hello community,

In pyspark how can I measure the running time to the command?
I just want to compare the running time of the RDD API and dataframe 
API, in my this blog:
https://bitfoxtop.wordpress.com/2021/12/23/count-email-addresses-using-sparks-rdd-and-dataframe/

I tried spark.time() it doesn't work.
Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
<mailto:user-unsubscr...@spark.apache.org> 



RE: How to estimate the executor memory size according by the data

2021-12-23 Thread Luca Canali
Hi Arthur,

If you are using Spark 3.x you can use executor metrics for memory 
instrumentation.  
Metrics are available on the WebUI, see 
https://spark.apache.org/docs/latest/web-ui.html#stage-detail (search for Peak 
execution memory).  
Memory execution metrics are available also in the REST API and the Spark 
metrics system, see https://spark.apache.org/docs/latest/monitoring.html  
Further information on the topic also at 
https://db-blog.web.cern.ch/blog/luca-canali/2020-08-spark3-memory-monitoring  
  
Best,
Luca

-Original Message-
From: Arthur Li  
Sent: Thursday, December 23, 2021 15:11
To: user@spark.apache.org
Subject: How to estimate the executor memory size according by the data

Dear experts,

Recently there’s some OOM issue in my demo jobs which consuming data from the 
hive database, and I know I can increase the executor memory size to eliminate 
the OOM error. While I don’t know how to do the executor memory assessment and 
how to automatically adopt the executor memory size by the data size.

Any options I appreciated.
Arthur Li

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Spark 3.0 plugins

2021-12-20 Thread Luca Canali
Hi Anil,

 

To recap: Apache Spark plugins are an interface and configuration that allows 
to inject code on executor start-up and, among others, provide a hook to the 
Spark metrics system. This provides a way to extend metrics collection beyond 
what is available in Apache Spark.   

Instrumenting some parts of the Spark workload with plugins provides additional 
flexibility compared to instrumentation that is committed in the Apache Spark 
code, as only users who want to activate it can do so and also they can play 
with configuration that may be customized for their environment, so not 
necessarily suitable for all possible uses of Apache Spark code.  

 

The repository https://github.com/cerndb/SparkPlugins that you mentioned 
provides code that implements a few Spark plugins that I developed and found 
useful, including plugins for measuring (some) I/O metrics.  

At present this is “third-party code”, you are most welcome to use, although it 
is not yet part of the Apache Spark project. I’d say it may end up there, as a 
set of examples maybe, if more people find this type of instrumentation useful. 
 

 

You referenced in your mail to the DATA+AI summit talk  What is New with Apache 
Spark Performance Monitoring in Spark 3.0 - Databricks 
<https://databricks.com/session_eu20/what-is-new-with-apache-spark-performance-monitoring-in-spark-3-0>
  you can also find additional work on this in the DATA+AI summit 2021 talk 
Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins - Databricks 
<https://databricks.com/session_na21/monitor-apache-spark-3-on-kubernetes-using-metrics-and-plugins>
 

 

Best,

Luca

 

From: Anil Dasari  
Sent: Monday, December 20, 2021 07:02
To: user@spark.apache.org
Subject: Spark 3.0 plugins

 

Hello everyone,

 

I was going through Apache Spark Performance Monitoring in Spark 3.0 
<https://www.youtube.com/watch?v=WFXzoRalwSg>  talk and wanted to collect IO 
metrics for my spark application. 

Couldn’t find Spark 3.0 built-in plugins for IO metrics like 
https://github.com/cerndb/SparkPlugins  in Spark 3 documentation. Does spark 3 
bundle have in-built IO metric plugins ? Thanks in advance.

 

Regards,

Anil

 



RE: Spark Prometheus Metrics for Executors Not Working

2021-05-24 Thread Luca Canali
The PrometheusServlet adds a servlet within the existing Spark UI to serve 
metrics data in Prometheus format. 
Similarly to what happens with the MetricsServlet, the Prometheus servlet does 
not work on executors, as executors do not have a Spark UI end point to which 
the servlet could attach to.  
Currently Apache Spark does not have an official Prometheus sink for the 
metrics system.  
Spark has a Graphite sink that can be used with InfluxDB 1.x, which could be 
useful in this context. 
I also see that a Prometheus sink is available as an external libraries from an 
open source project.

Best,
Luca

-Original Message-
From: paulp  
Sent: Monday, May 24, 2021 17:09
To: user@spark.apache.org
Subject: Spark Prometheus Metrics for Executors Not Working

Hi, 

recently our team has evaluated the prometheusServlet configuration in order to 
have Spark master, worker, driver and executor native Prometheus metrics.
We are running a Spark streaming standalone cluster version 3.0.1 on physical 
servers.

metrics.properties

*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus

I'm able to see Spark master, worker and driver metrics but cannot see any 
spark executor metrics (curling the relevant Prometheus endpoints just results 
in empty result).

For the purposes of executor metrics verification, I have taken one of our jobs 
and added the following parameters to the job configuration. These are the 
parameters I pass alongside our spark-submit command:

--conf spark.ui.prometheus.enabled=true \ --conf 
spark.executor.processTreeMetrics.enabled=true

Spark official documentation mentions that Prometheus executor endpoint is 
still experimental which may explain this inconsistent behaviour.

Any help will be highly appreciated!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Understanding Executors UI

2021-01-08 Thread Luca Canali
You report 'Storage Memory': 3.3TB/ 598.5 GB -> The first number is the memory 
used for storage, the second one is the available memory (for storage) in the 
unified memory pool.
The used memory shown in your webui snippet is indeed quite high (higher than 
the available memory!? ), you can probably profit by drilling down on that to 
understand better what is happening.
For example look at the details per executor (the numbers you reported are 
aggregated values), then also look at the “storage tab” for a list of cached 
RDDs with details.
In case, Spark 3.0 has improved memory instrumentation and improved 
instrumentation for streaming, so you can you profit from testing there too.


From: Eric Beabes 
Sent: Friday, January 8, 2021 04:23
To: Luca Canali 
Cc: spark-user 
Subject: Re: Understanding Executors UI

So when I see this for 'Storage Memory': 3.3TB/ 598.5 GB - it's telling me that 
Spark is using 3.3 TB of memory & 598.5 GB is used for caching data, correct? 
What I am surprised about is that these numbers don't change at all throughout 
the day even though the load on the system is low after 5pm PST.

I would expect the "Memory used" to be lower than 3.3Tb after 5pm PST.

Does Spark 3.0 do a better job of memory management? Wondering if upgrading to 
Spark 3.0 would improve performance?


On Wed, Jan 6, 2021 at 2:29 PM Luca Canali 
mailto:luca.can...@cern.ch>> wrote:
Hi Eric,

A few links, in case they can be useful for your troubleshooting:

The Spark Web UI is documented in Spark 3.x documentation, although you can use 
most of it for Spark 2.4 too: https://spark.apache.org/docs/latest/web-ui.html

Spark memory management is documented at  
https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
Additional resource: see also this diagram 
https://canali.web.cern.ch/docs/SparkExecutorMemory.png  and 
https://db-blog.web.cern.ch/blog/luca-canali/2020-08-spark3-memory-monitoring

Best,
Luca

From: Eric Beabes mailto:mailinglist...@gmail.com>>
Sent: Wednesday, January 6, 2021 00:20
To: spark-user mailto:user@spark.apache.org>>
Subject: Understanding Executors UI

[image.png]


Not sure if this image will go through. (Never sent an email to this mailing 
list with an image).

I am trying to understand this 'Executors' UI in Spark 2.4. I have a Stateful 
Structured Streaming job with 'State timeout' set to 10 minutes. When the load 
on the system is low a message gets written to Kafka immediately after the 
State times out BUT under heavy load it takes over 40 minutes to get a message 
on the output topic. Trying to debug this issue & see if performance can be 
improved.

Questions:

1) I am requesting 3.2 TB of memory but it seems the job keeps using only 598.5 
GB as per the values in 'Storage Memory' as well as 'On Heap Storage Memory'. 
Wondering if this is a Cluster issue OR am I not setting values correctly?
2) Where can I find documentation to understand different 'Tabs' in the Spark 
UI? (Sorry, Googling didn't help. I will keep searching.)

Any pointers would be appreciated. Thanks.



RE: Understanding Executors UI

2021-01-06 Thread Luca Canali
Hi Eric,

A few links, in case they can be useful for your troubleshooting:

The Spark Web UI is documented in Spark 3.x documentation, although you can use 
most of it for Spark 2.4 too: https://spark.apache.org/docs/latest/web-ui.html

Spark memory management is documented at  
https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
Additional resource: see also this diagram 
https://canali.web.cern.ch/docs/SparkExecutorMemory.png  and 
https://db-blog.web.cern.ch/blog/luca-canali/2020-08-spark3-memory-monitoring

Best,
Luca

From: Eric Beabes 
Sent: Wednesday, January 6, 2021 00:20
To: spark-user 
Subject: Understanding Executors UI

[image.png]


Not sure if this image will go through. (Never sent an email to this mailing 
list with an image).

I am trying to understand this 'Executors' UI in Spark 2.4. I have a Stateful 
Structured Streaming job with 'State timeout' set to 10 minutes. When the load 
on the system is low a message gets written to Kafka immediately after the 
State times out BUT under heavy load it takes over 40 minutes to get a message 
on the output topic. Trying to debug this issue & see if performance can be 
improved.

Questions:

1) I am requesting 3.2 TB of memory but it seems the job keeps using only 598.5 
GB as per the values in 'Storage Memory' as well as 'On Heap Storage Memory'. 
Wondering if this is a Cluster issue OR am I not setting values correctly?
2) Where can I find documentation to understand different 'Tabs' in the Spark 
UI? (Sorry, Googling didn't help. I will keep searching.)

Any pointers would be appreciated. Thanks.



RE: Adding isolation level when reading from DB2 with spark.read

2020-09-02 Thread Luca Canali
Hi Filipa ,

Spark JDBC data source has the option to add a "sessionInitStatement".
Documented in https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html 
and https://issues.apache.org/jira/browse/SPARK-21519 
I guess you could use that option  to "inject " a SET ISOLATION statement, 
although I am not familiar with the details of DB2.
Would that be useful for your use case?

Best,
Luca

-Original Message-
From: Filipa Sousa  
Sent: Wednesday, September 2, 2020 16:34
To: user@spark.apache.org
Cc: Ana Sofia Martins 
Subject: Adding isolation level when reading from DB2 with spark.read

Hello,

We are trying to read from an IBM DB2 database using a pyspark job.
We have a requirement to add an isolation level - Read Uncommitted (WITH 
UR) to the JDBC queries when reading DB2 data.
We found "isolationLevel" parameter in Spark documentation, but apparently 
it seems like it only applies to writing 
(https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html). Do you know 
if there's a similar one for reading?

isolationLevel - The transaction isolation level, which applies to current 
connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, 
REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction 
isolation levels defined by JDBC's Connection object, with default of 
READ_UNCOMMITTED. This option applies only to writing. Please refer the 
documentation in java.sql.Connection.

Also, we tested putting the "WITH UR" directly to the query, but since the 
isolation level must always be at the outer-most layer of the query, and Spark 
always parenthesizes the query 
(https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html), it throws an 
error.

The last try we did is to add this predicates option when reading with 
spark, but this is being ignored.
predicates = "PART_NR != '0' with UR"
input_df = (
self.spark.read.format("jdbc")
.option("url", self.db_settings["jdbc_url"])
.option("dbtable", db_table)
.option("user", self.db_settings["db_username"])
.option("password", self.db_settings["db_password"])
.option("predicates", predicates )
.option("fetchsize", self.fetch_size)
)

Do you have any advises on how can we do this?


Best regards,
Filipa Sousa
B�CB��[��X��ܚX�HK[XZ[
�\�\�][��X��ܚX�P�\�˘\X�K�ܙ�B�


RE: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

2020-08-24 Thread Luca Canali
Hi Abhishek,

Just a few ideas/comments on the topic:

When benchmarking/testing I find it useful to  collect a more complete view of 
resources usage and Spark metrics, beyond just measuring query elapsed time. 
Something like this:
https://github.com/cerndb/spark-dashboard

I'd rather not use dynamic allocation when benchmarking if possible, as it adds 
a layer of complexity when examining results.

If you suspect that reading from S3 vs. HDFS may play an important role on the 
performance you observe, you may want to drill down on that with a simple 
micro-benchmark, for example something like this (for Spark 3.0):

val df=spark.read.parquet("/TPCDS/tpcds_1500/store_sales")
df.write.format("noop").mode("overwrite").save

Best,
Luca

From: Rao, Abhishek (Nokia - IN/Bangalore) 
Sent: Monday, August 24, 2020 13:50
To: user@spark.apache.org
Subject: Spark 3.0 using S3 taking long time for some set of TPC DS Queries

Hi All,

We're doing some performance comparisons between Spark querying data on HDFS vs 
Spark querying data on S3 (Ceph Object Store used for S3 storage) using 
standard TPC DS Queries. We are observing that Spark 3.0 with S3 is consuming 
significantly larger duration for some set of queries when compared with HDFS.
We also ran similar queries with Spark 2.4.5 querying data from S3 and we see 
that for these set of queries, time taken by Spark 2.4.5 is lesser compared to 
Spark 3.0 looks to be very strange.
Below are the details of 9 queries where Spark 3.0 is taking >5 times the 
duration for running queries on S3 when compared to Hadoop.

Environment Details:

  *   Spark running on Kubernetes
  *   TPC DS Scale Factor: 500 GB
  *   Hadoop 3.x
  *   Same CPU and memory used for all executions

Query

Spark 3.0 with S3 (Time in seconds)

Spark 3.0 with Hadoop (Time in seconds)



Spark 2.4.5 with S3
(Time in seconds)

Spark 3.0 HDFS vs S3 (Factor)

Spark 2.4.5 S3 vs Spark 3.0 S3 (Factor)

Table involved

9

880.129

106.109

147.65

8.294574

5.960914

store_sales

44

129.618

23.747

103.916

5.458289

1.247334

store_sales

58

142.113

20.996

33.936

6.768575

4.187677

store_sales

62

32.519

5.425

14.809

5.994286

2.195894

web_sales

76

138.765

20.73

49.892

6.693922

2.781308

store_sales

88

475.824

48.2

94.382

9.871867

5.04147

store_sales

90

53.896

6.804

18.11

7.921223

2.976035

web_sales

94

241.172

43.49

81.181

5.545459

2.970794

web_sales

96

67.059

10.396

15.993

6.450462

4.193022

store_sales


When we analysed it further, we see that all these queries are performing 
operations either on store_sales or web_sales tables and Spark 3 with S3 seems 
to be downloading much more data from storage when compared to Spark 3 with 
Hadoop or Spark 2.4.5 with S3 and this is resulting in more time for query 
completion. I'm attaching the screen shots of Driver UI for one such instance 
(Query 9) for reference.
Also attached the spark configurations (Spark 3.0) used for these tests.

We're not sure why Spark 3.0 on S3 is having this behaviour. Any inputs on what 
we're missing?

Thanks and Regards,
Abhishek



Spark 2.4.4, RPC encryption and Python

2020-01-16 Thread Luca Toscano
Hi everybody,

I am currently testing Spark 2.4.4 with the following new settings:

spark.authenticate   true
spark.io.encryption.enabled   true
spark.io.encryption.keySizeBits   256
spark.io.encryption.keygen.algorithm   HmacSHA256
spark.network.crypto.enabled   true
spark.network.crypto.keyFactoryAlgorithm   PBKDF2WithHmacSHA256
spark.network.crypto.keyLength   256
spark.network.crypto.saslFallback   false

I use dynamic allocation and the Spark shuffler is set correctly in
Yarn. I added the following two options to yarn-site.xml's config:

  
  spark.authenticate
  true
  

  
  spark.network.crypto.enabled
  true
  

This works very well in all the scala-based code (spark2-shell,
spark-submit, etc..) but it doesn't for Pyspark, since I do see the
following warnings repeating over and over:

20/01/14 10:23:50 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
Attempted to request executors before the AM has registered!
20/01/14 10:23:50 WARN ExecutorAllocationManager: Unable to reach the
cluster manager to request 1 total executors!

The culprit seems to be the option "spark.io.encryption.enabled=true",
since without it everything works fine.

At first I thought that it was a Yarn resource allocation problem, but
then I checked and the cluster has plenty of space. After digging a
bit more into Yarn's container logs and I discovered that it seems a
problem related to the Application master not being able to contact
the Driver in time (assuming client mode of course):

20/01/14 09:45:21 INFO ApplicationMaster: ApplicationAttemptId:
appattempt_1576771377404_19608_01
20/01/14 09:45:21 INFO YarnRMClient: Registering the ApplicationMaster
20/01/14 09:45:52 ERROR TransportClientFactory: Exception while
bootstrapping client after 30120 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException:
Timeout waiting for task.
at 
org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:263)
at 
org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
at 
org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:257)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
at 
org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:259)
... 11 more

The strange part is that sometimes the timeout doesn't occur, and
sometimes it does. I checked the code related to the above stacktrace
and ended up to:

https://github.com/apache/spark/blob/branch-2.4/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java#L106
https://github.com/apache/spark/blob/branch-2.4/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java#L129-L133

The "spark.network.auth.rpcTimeout" option seems to help, even if it
is not advertised in the docs as far as I can see (the 30s mentioned
in the exception are definitely trigger by this setting though). What
I am wondering is where/what I should check to debug this further,
since it seems a Python only problem that doesn't affect Scala. I
didn't find any outstanding bugs, so given the fact that 2.4.4 is very
recent I thought to report it in here to ask for an advice :)

Thanks in advance!

Luca

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Apache Spark Log4j logging applicationId

2019-07-23 Thread Luca Borin
Hi,

I would like to add the applicationId to all logs produced by Spark through
Log4j. Consider that I have a cluster with several jobs running in it, so
the presence of the applicationId would be useful to logically divide them.

I have found a partial solution. If I change the layout of the
PatternLayout logger, I can add the print of the ThreadContext (see here
), which
can be used to add through MDC the information of the applicationId (see
here
).
This works for the driver, but I would like to add this information at
Spark application startup, both for driver and workers. Notice that I'm
working with a managed environment (Databricks), so I'm partially limited
in cluster management. One workaround to execute the put of the parameter
through MDC to all workers is to use a broadcast variable and perform an
action with it, but I don't think it is stable, considering that this
should work also if the worker machine restarts or is substituted.

Thank you


RE: tcps oracle connection from spark

2019-06-19 Thread Luca Canali
Connecting to Oracle from Spark using the TPCS protocol works OK for me.
Maybe try to turn debug on with -Djavax.net.debug=all?
See also:
https://blogs.oracle.com/dev2dev/ssl-connection-to-oracle-db-using-jdbc%2c-tlsv12%2c-jks-or-oracle-wallets

Regards,
L.

From: Richard Xin 
Sent: Wednesday, June 19, 2019 00:51
To: User 
Subject: Re: tcps oracle connection from spark

and btw, same connection string works fine when used in SQL Developer.

On Tuesday, June 18, 2019, 03:49:24 PM PDT, Richard Xin 
mailto:richardxin...@yahoo.com>> wrote:


HI, I need help with tcps oracle connection from spark (version: 
spark-2.4.0-bin-hadoop2.7)


Properties prop = new Properties();
prop.putAll(sparkOracle);  // username/password

prop.put("javax.net.ssl.trustStore", "path to root.jks");
prop.put("javax.net.ssl.trustStorePassword", "password_here");

df.write()
.mode(SaveMode.Append)
.option("driver", "oracle.jdbc.driver.OracleDriver")
.jdbc("jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=tcps)(HOST=host.mycomapny.com)(PORT=1234)))(CONNECT_DATA=(SERVICE_NAME=service_name)))","tableName",
 prop)
;


note "PROTOCOL=tcps" in the connection string.

The code worked fine for "tcp" hosts, but some of our servers use "tcps" only, 
I got following errors when hitting oracld tcps hosts, can someone shed some 
light? Thanks a lot!

Exception in thread "main" java.sql.SQLRecoverableException: IO Error: Remote 
host terminated the handshake
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:682)
at oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:715)
at oracle.jdbc.driver.T4CConnection.(T4CConnection.java:385)
at 
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:30)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:564)
at 
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506)
at 
com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:103)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit

RE: Spark Profiler

2019-03-27 Thread Luca Canali
I find that the Spark metrics system is quite useful to gather resource 
utilization metrics of Spark applications, including CPU, memory and I/O.
If you are interested an example how this works for us at: 
https://db-blog.web.cern.ch/blog/luca-canali/2019-02-performance-dashboard-apache-spark
If instead you are rather looking at ways to instrument your Spark code with 
performance metrics, Spark task metrics and event listeners are quite useful 
for that. See also 
https://github.com/apache/spark/blob/master/docs/monitoring.md and 
https://github.com/LucaCanali/sparkMeasure

Regards,
Luca

From: manish ranjan 
Sent: Tuesday, March 26, 2019 15:24
To: Jack Kolokasis 
Cc: user 
Subject: Re: Spark Profiler

I have found ganglia very helpful in understanding network I/o , CPU and memory 
usage  for a given spark cluster.
I have not used , but have heard good things about Dr Elephant ( which I think 
was contributed by LinkedIn but not 100%sure).

On Tue, Mar 26, 2019, 5:59 AM Jack Kolokasis 
mailto:koloka...@ics.forth.gr>> wrote:
Hello all,

 I am looking for a spark profiler to trace my application to find
the bottlenecks. I need to trace CPU usage, Memory Usage and I/O usage.

I am looking forward for your reply.

--Iacovos


-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


RE: kerberos auth for MS SQL server jdbc driver

2018-10-15 Thread Luca Canali
We have a case where we interact with a Kerberized service and found a simple 
workaround to distribute and make use of the driver’s Kerberos credential cache 
file in  the executors. Maybe some of the ideas there can be of help for this 
case too? Our case in on Linux though. Details: 
https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_Executors_Kerberos_HowTo.md

Regards,
Luca

From: Marcelo Vanzin 
Sent: Monday, October 15, 2018 18:32
To: foster.langb...@riskfrontiers.com
Cc: user 
Subject: Re: kerberos auth for MS SQL server jdbc driver

Spark only does Kerberos authentication on the driver. For executors it 
currently only supports Hadoop's delegation tokens for Kerberos.

To use something that does not support delegation tokens you have to manually 
manage the Kerberos login in your code that runs in executors, which might be 
tricky. It means distributing the keytab yourself (not with Spark's --keytab 
argument) and calling into the UserGroupInformation API directly.

I don't have any examples of that, though, maybe someone does. (We have a 
similar example for Kafka on our blog somewhere, but not sure how far that will 
get you with MS SQL.)


On Mon, Oct 15, 2018 at 12:04 AM Foster Langbein 
mailto:foster.langb...@riskfrontiers.com>> 
wrote:
Has anyone gotten spark to write to SQL server using Kerberos authentication 
with Microsoft's JDBC driver? I'm having limited success, though in theory it 
should work.

I'm using a YARN-mode 4-node Spark 2.3.0 cluster and trying to write a simple 
table to SQL Server 2016. I can get it to work if I use SQL server credentials, 
however this is not an option in my application. I need to use windows 
authentication - so-called integratedSecurity - and in particular I want to use 
a keytab file.

The solution half works - the spark driver creates a table on SQL server - so 
I'm pretty confident the Kerberos implementation/credentials etc are setup 
correctly and valid. However the executors then fail to write any data to the 
table with an exception: "java.security.PrivilegedActionException: 
GSSException: No valid credentials provided (Mechanism level: Failed to find 
any Kerberos tgt)"

After much tracing/debugging it seems executors are behaving differently to the 
spark driver and ignoring the specification to use the credentials supplied in 
the keytab and instead trying to use the default spark cluster user. I simply 
haven't been able to force them to use what's in the keytab after trying many. 
many variations.

Very grateful if anyone has any help/suggestions/ideas on how to get this to 
work.


--

 [Image removed by sender.]


Dr Foster Langbein | Chief Technology Officer | Risk Frontiers

Level 2, 100 Christie St, St Leonards, NSW, 2065



Telephone: +61 2 8459 9777

Email: 
foster.langb...@riskfrontiers.com<mailto:foster.langb...@riskfrontiers.com> | 
Website: www.riskfrontiers.com<http://www.riskfrontiers.com/>



Risk Modelling | Risk Management | Resilience | Disaster Management | Social 
Research
Australia | New Zealand | Asia Pacific





--
Marcelo


Spark and Kafka direct approach problem

2016-05-04 Thread Luca Ferrari
Hi,

I’m new on Apache Spark and I’m trying to run the Spark Streaming + Kafka 
Integration Direct Approach example (JavaDirectKafkaWordCount.java).

I’ve downloaded all the libraries but when I try to run I get this error


Exception in thread "main" java.lang.NoSuchMethodError: 
scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;

at kafka.api.RequestKeys$.(RequestKeys.scala:48)

at kafka.api.RequestKeys$.(RequestKeys.scala)

at kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:55)

at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)

at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)

at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)

at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)

at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)

at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)

at it.unimi.di.luca.SimpleApp.main(SimpleApp.java:53)

Any suggestions?

Cheers
Luca
 



R: How many disks for spark_local_dirs?

2016-04-18 Thread Luca Guerra
Hi Jan,
It's a physical server, I have launched the application with:
- "spark.cores.max": "12",
- "spark.executor.cores": "3"
- 2 GB RAM per worker
Spark version is 1.6.0, I don't use Hadoop.

Thanks,
Luca

-Messaggio originale-
Da: Jan Rock [mailto:r...@do-hadoop.com] 
Inviato: venerdì 15 aprile 2016 18:04
A: Luca Guerra 
Cc: user@spark.apache.org
Oggetto: Re: How many disks for spark_local_dirs?

Hi, 

is it physical server or AWS/Azure? What are the executed parameters for 
spark-shell command? Hadoop distro/version and Spark version?

Kind Regards,
Jan


> On 15 Apr 2016, at 16:15, luca_guerra  wrote:
> 
> Hi,
> I'm looking for a solution to improve my Spark cluster performances, I 
> have read from http://spark.apache.org/docs/latest/hardware-provisioning.html:
> "We recommend having 4-8 disks per node", I have tried both with one 
> and two disks but I have seen that with 2 disks the execution time is 
> doubled. Any explanations about this?
> 
> This is my configuration:
> 1 machine with 140 GB RAM 2 disks and 32 CPU (I know that is an 
> unusual
> configuration) and on this I have a standalone Spark cluster with 1 Worker.
> 
> Thank you very much for the help.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-many-disks-for
> -spark-local-dirs-tp26790.html Sent from the Apache Spark User List 
> mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



R: How many disks for spark_local_dirs?

2016-04-18 Thread Luca Guerra
Hi Mich,
I have only 32 cores, I have tested with 2 GB of memory per worker to force 
spills to disk. My application had 12 cores and 3 cores per executor.

Thank you very much.
Luca

Da: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Inviato: venerdì 15 aprile 2016 18:56
A: Luca Guerra 
Cc: user @spark 
Oggetto: Re: How many disks for spark_local_dirs?

Is that 32 CPUs or 32 cores?

So in this configuration assuming 32 cores you have I worker with how much 
memory (deducting memory for OS etc) and 32 cores.

What is the ratio  of memory per core in this case?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 15 April 2016 at 16:15, luca_guerra 
mailto:lgue...@bitbang.com>> wrote:
Hi,
I'm looking for a solution to improve my Spark cluster performances, I have
read from http://spark.apache.org/docs/latest/hardware-provisioning.html:
"We recommend having 4-8 disks per node", I have tried both with one and two
disks but I have seen that with 2 disks the execution time is doubled. Any
explanations about this?

This is my configuration:
1 machine with 140 GB RAM 2 disks and 32 CPU (I know that is an unusual
configuration) and on this I have a standalone Spark cluster with 1 Worker.

Thank you very much for the help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-many-disks-for-spark-local-dirs-tp26790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



Spark 1.6.0 - token renew failure

2016-04-13 Thread Luca Rea
Hi,
I'm testing Livy server with Hue 3.9 and Spark 1.6.0 inside a kerberized 
cluster (HDP 2.4), when I run the command


/usr/java/jdk1.7.0_71//bin/java -Dhdp.version=2.4.0.0-169 -cp 
/usr/hdp/2.4.0.0-169/spark/conf/:/usr/hdp/2.4.0.0-169/spark/lib/spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar:/usr/hdp/2.4.0.0-169/spark/lib/datanucleus-core-3.2.10.jar:/usr/hdp/2.4.0.0-169/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/hdp/2.4.0.0-169/spark/lib/datanucleus-api-jdo-3.2.6.jar:/etc/hadoop/conf/:/usr/hdp/2.4.0.0-169/hadoop/lib/hadoop-lzo-0.6.0.2.4.0.0-169.jar
 -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master yarn-cluster 
--conf spark.livy.port=0 --conf 
spark.livy.callbackUrl=http://172.16.24.26:8998/sessions/0/callback --conf 
spark.driver.extraJavaOptions=-Dhdp.version=2.4.0.0-169 --class 
com.cloudera.hue.livy.repl.Main --name Livy --proxy-user luca.rea 
/var/cloudera_hue/apps/spark/java/livy-assembly/target/scala-2.10/livy-assembly-0.2.0-SNAPSHOT.jar
 spark


This fails renewing the token  and returns the error below:


16/04/13 09:34:52 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
16/04/13 09:34:53 INFO org.apache.hadoop.security.UserGroupInformation: Login 
successful for user spark-pantagr...@contactlab.lan using keytab file 
/etc/security/keytabs/spark.headless.keytab
16/04/13 09:34:54 INFO 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl: Timeline service 
address: http://pg-master04.contactlab.lan:8188/ws/v1/timeline/
16/04/13 09:34:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: 
The short-circuit local reads feature cannot be used because libhadoop cannot 
be loaded.
16/04/13 09:34:55 INFO org.apache.hadoop.hdfs.DFSClient: Created 
HDFS_DELEGATION_TOKEN token 2135943 for luca.rea on ha-hdfs:pgha
Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
luca.rea tries to renew a token with renewer spark
at 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.renewToken(AbstractDelegationTokenSecretManager.java:481)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renewDelegationToken(FSNamesystem.java:6793)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.renewDelegationToken(NameNodeRpcServer.java:635)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.renewDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:1005)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at 
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient$Renewer.renew(DFSClient.java:1147)
at org.apache.hadoop.security.token.Token.renew(Token.java:385)
at 
org.apache.spark.deploy.yarn.Client.getTokenRenewalInterval(Client.scala:593)
at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:621)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:721)
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1065)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1125)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache

Re: Help with collect() in Spark Streaming

2015-09-12 Thread Luca
I am trying to implement an application that requires the output to be
aggregated and stored as a single txt file to HDFS (instead of, for
instance, having 4 different txt files coming from my 4 workers).
The solution I used does the trick, but I can't tell if it's ok to
regularly stress one of the worker for the writing. That is why I thought
about having the driver collecting and storing the data.

Thanks for your patience and for your help, anyway. :)

2015-09-11 19:00 GMT+02:00 Holden Karau :

> Having the driver write the data instead of a worker probably won't spread
> it up, you still need to copy all of the data to a single node. Is there
> something which forces you to only write from a single node?
>
>
> On Friday, September 11, 2015, Luca  wrote:
>
>> Hi,
>> thanks for answering.
>>
>> With the *coalesce() *transformation a single worker is in charge of
>> writing to HDFS, but I noticed that the single write operation usually
>> takes too much time, slowing down the whole computation (this is
>> particularly true when 'unified' is made of several partitions). Besides,
>> 'coalesce' forces me to perform a further repartitioning ('true' flag), in
>> order not to lose upstream parallelism (by the way, did I get this part
>> right?).
>> Am I wrong in thinking that having the driver do the writing will speed
>> things up, without the need of repartitioning data?
>>
>> Hope I have been clear, I am pretty new to Spark. :)
>>
>> 2015-09-11 18:19 GMT+02:00 Holden Karau :
>>
>>> A common practice to do this is to use foreachRDD with a local var to
>>> accumulate the data (you can see it in the Spark Streaming test code).
>>>
>>> That being said, I am a little curious why you want the driver to create
>>> the file specifically.
>>>
>>> On Friday, September 11, 2015, allonsy  wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I have a JavaPairDStream object and I'd like the
>>>> Driver to
>>>> create a txt file (on HDFS) containing all of its elements.
>>>>
>>>> At the moment, I use the /coalesce(1, true)/ method:
>>>>
>>>>
>>>> JavaPairDStream unified = [partitioned stuff]
>>>> unified.foreachRDD(new Function,
>>>> Void>() {
>>>> public Void call(JavaPairRDD>>> String> arg0) throws Exception {
>>>> arg0.coalesce(1,
>>>> true).saveAsTextFile();
>>>> return null;
>>>> }
>>>> });
>>>>
>>>>
>>>> but this implies that a /single worker/ is taking all the data and
>>>> writing
>>>> to HDFS, and that could be a major bottleneck.
>>>>
>>>> How could I replace the worker with the Driver? I read that /collect()/
>>>> might do this, but I haven't the slightest idea on how to implement it.
>>>>
>>>> Can anybody help me?
>>>>
>>>> Thanks in advance.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-collect-in-Spark-Streaming-tp24659.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>> Linked In: https://www.linkedin.com/in/holdenkarau
>>>
>>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>
>


Re: Help with collect() in Spark Streaming

2015-09-11 Thread Luca
Hi,
thanks for answering.

With the *coalesce() *transformation a single worker is in charge of
writing to HDFS, but I noticed that the single write operation usually
takes too much time, slowing down the whole computation (this is
particularly true when 'unified' is made of several partitions). Besides,
'coalesce' forces me to perform a further repartitioning ('true' flag), in
order not to lose upstream parallelism (by the way, did I get this part
right?).
Am I wrong in thinking that having the driver do the writing will speed
things up, without the need of repartitioning data?

Hope I have been clear, I am pretty new to Spark. :)

2015-09-11 18:19 GMT+02:00 Holden Karau :

> A common practice to do this is to use foreachRDD with a local var to
> accumulate the data (you can see it in the Spark Streaming test code).
>
> That being said, I am a little curious why you want the driver to create
> the file specifically.
>
> On Friday, September 11, 2015, allonsy  wrote:
>
>> Hi everyone,
>>
>> I have a JavaPairDStream object and I'd like the Driver
>> to
>> create a txt file (on HDFS) containing all of its elements.
>>
>> At the moment, I use the /coalesce(1, true)/ method:
>>
>>
>> JavaPairDStream unified = [partitioned stuff]
>> unified.foreachRDD(new Function, Void>() {
>> public Void call(JavaPairRDD> String> arg0) throws Exception {
>> arg0.coalesce(1,
>> true).saveAsTextFile();
>> return null;
>> }
>> });
>>
>>
>> but this implies that a /single worker/ is taking all the data and writing
>> to HDFS, and that could be a major bottleneck.
>>
>> How could I replace the worker with the Driver? I read that /collect()/
>> might do this, but I haven't the slightest idea on how to implement it.
>>
>> Can anybody help me?
>>
>> Thanks in advance.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-collect-in-Spark-Streaming-tp24659.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>
>


Re: Kafka direct approach: blockInterval and topic partitions

2015-08-10 Thread Luca
Thank you! :)

2015-08-10 19:58 GMT+02:00 Cody Koeninger :

> There's no long-running receiver pushing blocks of messages, so
> blockInterval isn't relevant.
>
> Batch interval is what matters.
>
> On Mon, Aug 10, 2015 at 12:52 PM, allonsy  wrote:
>
>> Hi everyone,
>>
>> I recently started using the new Kafka direct approach.
>>
>> Now, as far as I understood, each Kafka partition /is/ an RDD partition
>> that
>> will be processed by a single core.
>> What I don't understand is the relation between those partitions and the
>> blocks generated every blockInterval.
>>
>> For example, assume:
>>
>> 1000ms batch interval
>> 16 topic partitions (total of 16 cores available)
>>
>> Moreover, we have that the blockInterval is set to 200ms.
>>
>> What am I actually dividing by the blockInterval value in such a scenario?
>> I'd like to tune this value but I cannot understand what it stands for.
>>
>> I hope I made myself clear,
>>
>> thank you all! :)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-direct-approach-blockInterval-and-topic-partitions-tp24197.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


[no subject]

2015-02-18 Thread Luca Puggini



Re: generate a random matrix with uniform distribution

2015-02-09 Thread Luca Puggini
Thanks a lot!
Can I ask why this code generates a uniform distribution?

If dist is N(0,1) data should be  N(-1, 2).

Let me know.
Thanks,
Luca

2015-02-07 3:00 GMT+00:00 Burak Yavuz :

> Hi,
>
> You can do the following:
> ```
> import org.apache.spark.mllib.linalg.distributed.RowMatrix
> import org.apache.spark.mllib.random._
>
> // sc is the spark context, numPartitions is the number of partitions you
> want the RDD to be in
> val dist: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, n, k,
> numPartitions, seed)
> // make the distribution uniform between (-1, 1)
> val data = dist.map(_ * 2  - 1)
> val matrix = new RowMatrix(data, n, k)
> On Feb 6, 2015 11:18 AM, "Donbeo"  wrote:
>
>> Hi
>> I would like to know how can I generate a random matrix where each element
>> come from a uniform distribution in -1, 1 .
>>
>> In particular I would like the matrix be a distributed row matrix with
>> dimension n x p
>>
>> Is this possible with mllib? Should I use another library?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/generate-a-random-matrix-with-uniform-distribution-tp21538.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


matrix of random variables with spark.

2015-02-06 Thread Luca Puggini
Hi all,
this is my first email with this mailing list and I hope that I am not
doing anything wrong.

I am currently trying to define a distributed matrix with n rows and k
columns where each element is randomly sampled by a uniform distribution.
How can I do that?

It would be also nice if you can suggest me any good guide that I can use
to start working with Spark. (The quick start tutorial is not enough for me
)

Thanks a lot !