So 164 GB of parquet data –can potentially explode to upto 1000 GB data if the 
data is compressed (in practice it would be more like 400-600 GB)
Your executors have about 96 GB data.
With that kind of volume, 100-300 executors is ok (I would do tests with 
100-300), but 30k shuffle partitions are still excessive.

From: Vitaliy Pisarev <vitaliy.pisa...@biocatch.com>
Date: Thursday, November 15, 2018 at 1:58 PM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: Shahbaz <shahzadh...@gmail.com>, user <user@spark.apache.org>, David 
Markovitz <dudu.markov...@microsoft.com>
Subject: Re: How to address seemingly low core utilization on a spark workload?

Small update, my initial estimate was incorrect. I have one location with 16*4G 
= 64G parquests (in snappy) + 20 * 5G = 100G parquets. So a total of 164G.

I am running on Databricks.
Here are some settings:

spark.executor.extraJavaOptions=-XX:ReservedCodeCacheSize=256m 
-XX:+UseCodeCacheFlushing -Ddatabricks.serviceName=spark-executor-1 
-javaagent:/databricks/DatabricksAgent.jar -XX:+PrintFlagsFinal 
-XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -Xss4m 
-Djavax.xml.datatype.DatatypeFactory=com.sun.org.apache.xerces.internal.jaxp.datatype.DatatypeFactoryImpl
 
-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl
 
-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl
 
-Djavax.xml.validation.SchemaFactory:http://www.w3.org/2001/XMLSchema=com.sun.org.apache.xerces.internal.jaxp.validation.XMLSchemaFactory
 -Dorg.xml.sax.driver=com.sun.org.apache.xerces.internal.parsers.SAXParser 
-Dorg.w3c.dom.DOMImplementationSourceList=com.sun.org.apache.xerces.internal.dom.DOMXSImplementationSourceImpl
spark.executor.memory=107407m
spark.executor.tempDirectory=/local_disk0/tmp

These are the only relevant setting that I see set when looking at the logs. I 
am guessing this means that the others are simply set to default.
Are there any setting I should pay special attention to? (reference is also 
good).

My assumption is the the Databricks runtime is already preconfigured with known 
best practices (like corse per executor...). Now that I think of it I need to 
validate this assumption.

On Thu, Nov 15, 2018 at 9:14 PM Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
While there is some merit to that thought process, I would steer away from 
premature JVM GC optimization of this kind.
What are the memory, cpu and other settings (e.g. any JVM/GC settings) for the 
executors and driver?
So assuming that you are reading about 16 files of say 2-4 GB each, that’s 
about 32-64 GB of (compressed?) data in parquet files.
Do you have access to the Spark UI – what is the peak memory that you see for 
the executors?
The UI will also give you the time spent on GC by each executor.
So even if you completely eliminated all GC, that’s the max time you can 
potentially save.


From: Vitaliy Pisarev 
<vitaliy.pisa...@biocatch.com<mailto:vitaliy.pisa...@biocatch.com>>
Date: Thursday, November 15, 2018 at 1:03 PM
To: Shahbaz <shahzadh...@gmail.com<mailto:shahzadh...@gmail.com>>
Cc: "Thakrar, Jayesh" 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>, user 
<user@spark.apache.org<mailto:user@spark.apache.org>>, 
"dudu.markov...@microsoft.com<mailto:dudu.markov...@microsoft.com>" 
<dudu.markov...@microsoft.com<mailto:dudu.markov...@microsoft.com>>
Subject: Re: How to address seemingly low core utilization on a spark workload?

Agree, and I will try it. One clarification though: the amount of partitions 
also affects their in memory size. So fewer partitions may result in higher 
memory preassure and Ooms. I think this was the original intention.

So the motivation for partitioning is also to break down volumes yo fit the 
machines.

Is this premise wrong?

On Thu, Nov 15, 2018, 19:49 Shahbaz 
<shahzadh...@gmail.com<mailto:shahzadh...@gmail.com> wrote:
30k Sql shuffle partitions is extremely high.Core to Partition is 1 to  1 
,default value of Sql shuffle partitions is  200 ,set it to 300 or leave it to 
default ,see which one gives best performance,after you do that ,see how cores 
are being used?

Regards,
Shahbaz

On Thu, Nov 15, 2018 at 10:58 PM Vitaliy Pisarev 
<vitaliy.pisa...@biocatch.com<mailto:vitaliy.pisa...@biocatch.com>> wrote:
Oh, regarding and shuffle.partitions being 30k, don't know. I inherited the 
workload from an engineer that is no longer around and am trying to make sense 
of things in general.

On Thu, Nov 15, 2018 at 7:26 PM Vitaliy Pisarev 
<vitaliy.pisa...@biocatch.com<mailto:vitaliy.pisa...@biocatch.com>> wrote:
The quest is dual:


  *   Increase utilisation- because cores cost money and I want to make sure 
that if I fully utilise what I pay for. This is very blunt of corse, because 
there is always i/o and at least some degree of skew. Bottom line is do the 
same thing over the same time but with fewer (but better utilised) resources.
  *   Reduce runtime by increasing parallelism.
While not the same, I am looking at these as two sides of the same coin.





On Thu, Nov 15, 2018 at 6:58 PM Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
For that little data, I find spark.sql.shuffle.partitions = 30000 to be very 
high.
Any reason for that high value?

Do you have a baseline observation with the default value?

Also, enabling the jobgroup and job info through the API and observing through 
the UI will help you understand the code snippets when you have low utilization.

Finally, high utilization does not equate to high efficiency.
Its very likely that for your workload, you may only need 16-128 executors.
I would suggest getting the partition count for the various 
datasets/dataframes/rdds in your code by using

dataset.rdd. getNumPartitions

I would also suggest doing a number of tests with different number of executors 
too.

But coming back to the objective behind your quest – are you trying to maximize 
utilization hoping that by having high parallelism will reduce your total 
runtime?


From: Vitaliy Pisarev 
<vitaliy.pisa...@biocatch.com<mailto:vitaliy.pisa...@biocatch.com>>
Date: Thursday, November 15, 2018 at 10:07 AM
To: <jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>, David Markovitz 
<dudu.markov...@microsoft.com<mailto:dudu.markov...@microsoft.com>>
Subject: Re: How to address seemingly low core utilization on a spark workload?

I am working with parquets and the metadata reading there is quite fast as 
there are at most 16 files (a couple of gigs each).

I find it very hard to answer the question: "how many partitions do you have?", 
many spark operations do not preserve partitioning and I have a lot of 
filtering and grouping going on.
What I can say is that I specified spark.sql.shuffle.partitions to 30,000.

I am not worried that there are not enough partitions to keep the cores 
working. Having said that I do see that the high utilisation correlates heavily 
with shuffle read/write. Whereas low utilisation correlates with no shuffling.
This leads me to the conclusion that compared to the amount of shuffling, the 
cluster is doing very little work.

Question is what can I do about it.

On Thu, Nov 15, 2018 at 5:29 PM Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Can you shed more light on what kind of processing you are doing?

One common pattern that I have seen for active core/executor utilization 
dropping to zero is while reading ORC data and the driver seems (I think) to be 
doing schema validation.
In my case I would have hundreds of thousands of ORC data files and there is 
dead silence for about 1-2 hours.
I have tried providing a schema and disabling schema validation while reading 
the ORC data, but that does not seem to help (Spark 2.2.1).

And as you know, in most cases, there is a linear relationship between number 
of partitions in your data and the concurrently active executors.

Another thing I would suggest is use the following two API calls/method – they 
will annotate the spark stages and jobs with what is being executed in the 
Spark UI.
SparkContext.setJobGroup(….)
SparkContext.setJobDescription(….)

From: Vitaliy Pisarev 
<vitaliy.pisa...@biocatch.com<mailto:vitaliy.pisa...@biocatch.com>>
Date: Thursday, November 15, 2018 at 8:51 AM
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Cc: David Markovitz 
<dudu.markov...@microsoft.com<mailto:dudu.markov...@microsoft.com>>
Subject: How to address seemingly low core utilization on a spark workload?

I have a workload that runs on a cluster of 300 cores.
Below is a plot of the amount of active tasks over time during the execution of 
this workload:

Error! Filename not specified.

What I deduce is that there are substantial intervals where the cores are 
heavily under-utilised.

What actions can I take to:

  *   Increase the efficiency (== core utilisation) of the cluster?
  *   Understand the root causes behind the drops in core utilisation?

Reply via email to