This 
(https://www.elastic.co/blog/benchmarking-and-sizing-your-elasticsearch-cluster-for-logs-and-metrics)
 has the math for sizing the cluster. There is a similar document 
(https://docs.aws.amazon.com/opensearch-service/latest/developerguide/sizing-domains.html)
 on sizing your cluster on AWS.

10MB per bulk api call is what elastic search recommends. I’m not sure how they 
came up with the number

Elastic search gives you the size of each index. I would load 10K records into 
ES, look at size of index, divide by 10K to roughly get the size of each row. 
This won’t give you an exact figure, but it will be in the ballpark.

From: Maksim Grinman <m...@resolute.ai>
Date: Friday, February 11, 2022 at 2:21 PM
To: "Lalwani, Jayesh" <jlalw...@amazon.com>
Cc: Mich Talebzadeh <mich.talebza...@gmail.com>, Holden Karau 
<hol...@pigscanfly.ca>, Sean Owen <sro...@gmail.com>, "user @spark" 
<user@spark.apache.org>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks for these suggestions. Regarding hot nodes, are you referring to the 
same as in this article? 
https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x.
I am also curious where the 10MB heuristic came from, though I have heard a 
similar heuristic with respect to the size of a partition. I suspect the best 
way to see the size of a partition is simply to write to parquet and observe 
the size of the written parquet partitions?

Thanks

On Fri, Feb 11, 2022 at 12:48 PM Lalwani, Jayesh 
<jlalw...@amazon.com<mailto:jlalw...@amazon.com>> wrote:
You can probably tune writing to elastic search by

  1.  Increasing number of partitions so you are writing smaller batches of 
rows to elastic search
  2.  Using Elastic search’s bulk api
  3.  Scaling up the number of hot nodes on elastic search cluster to support 
writing in parallel.

You want to minimize long running tasks. Not just to avoid the “thread dump”. 
Large number of shorter running tasks are better than Small number of long 
running tasks, because you can scale up your processing by throwing hardware at 
it. This is subject to law of diminishing returns; ie; at some point making 
your tasks smaller will start slowing you down. You need to find the sweet spot.

Generally for elastic search, the sweet spot is that each task writes around 
10MB of data using the bulk API. Writing 10MB of data per task should be take 
order of few seconds. You won’t get the dreaded thread dump if your tasks are 
taking few seconds

From: Maksim Grinman <m...@resolute.ai<mailto:m...@resolute.ai>>
Date: Thursday, February 10, 2022 at 7:21 PM
To: "Lalwani, Jayesh" <jlalw...@amazon.com<mailto:jlalw...@amazon.com>>
Cc: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>, Holden Karau 
<hol...@pigscanfly.ca<mailto:hol...@pigscanfly.ca>>, Sean Owen 
<sro...@gmail.com<mailto:sro...@gmail.com>>, "user @spark" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


That's fair, but I do get the same thread dump at the last step of the spark 
job, where we write the final dataframe into an elasticsearch index. It's a 
df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation which takes a 
while and we usually get a thread dump during that as well.

On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh 
<jlalw...@amazon.com<mailto:jlalw...@amazon.com>> wrote:
Probably not the answer you are looking for, but the best thing to do is to 
avoid making Spark code sleep. Is there a way you can predict how big your 
autoscaling group needs to be without looking at all the data? Are you using 
fixed number of Spark executors or are you have some way of scaling your 
executors? I am guessing that the size of your autoscaling group is 
proportional to the number of Spark executors. You can probably measure how 
many executors each can support. Then you can tie in the size of your 
autoscaling group to the number of executors.

Alternatively, you can build your service so a) it autoscales as load increases 
b) throttle requests when the load is higher than it can manage now. This means 
that when Spark executors start hitting your nodes, your service will throttle 
many of the requests, and start autoscaling up. Note that this is an 
established pattern in the cloud. This is how most services on AWS work. The 
end result is that initially there will be higher latency due to cold start, 
but the system will catch up eventually

From: Maksim Grinman <m...@resolute.ai<mailto:m...@resolute.ai>>
Date: Friday, February 4, 2022 at 9:35 PM
To: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Cc: Holden Karau <hol...@pigscanfly.ca<mailto:hol...@pigscanfly.ca>>, Sean Owen 
<sro...@gmail.com<mailto:sro...@gmail.com>>, "user @spark" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Not that this discussion is not interesting (it is), but this has strayed 
pretty far from my original question. Which was: How do I prevent spark from 
dumping huge Java Full Thread dumps when an executor appears to not be doing 
anything (in my case, there's a loop where it sleeps waiting for a service to 
come up). The service happens to be set up using an auto-scaling group, a 
coincidental and unimportant detail that seems to have derailed the 
conversation.

On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote:
OK basically, do we have a scenario where Spark or for that matter any cluster 
manager can deploy a new node (after the loss of  an existing node) with the 
view of running the failed tasks on the new executor(s) deployed on that newly 
spun node?




 Error! Filename not specified.  view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Sat, 5 Feb 2022 at 00:00, Holden Karau 
<hol...@pigscanfly.ca<mailto:hol...@pigscanfly.ca>> wrote:
We don’t block scaling up after node failure in classic Spark if that’s the 
question.

On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote:
From what I can see in auto scaling setup, you will always need a min of two 
worker nodes as primary. It also states and I quote "Scaling primary workers is 
not recommended due to HDFS limitations which result in instability while 
scaling. These limitations do not exist for secondary workers". So the scaling 
comes with the secondary workers specifying the min and max instances. It also 
defaults to 2 minutes for the so-called auto scaling cooldown duration hence 
that delay observed. I presume task allocation to the new executors is FIFO for 
new tasks. This 
link<https://docs.qubole.com/en/latest/admin-guide/engine-admin/spark-admin/autoscale-spark.html#:~:text=dynamic%20allocation%20configurations.-,Autoscaling%20in%20Spark%20Clusters,scales%20down%20towards%20the%20minimum.&text=By%20default%2C%20Spark%20uses%20a%20static%20allocation%20of%20resources.>
 does some explanation on autoscaling.

Handling Spot Node Loss and Spot Blocks in Spark Clusters
"When the Spark AM receives the spot loss (Spot Node Loss or Spot Blocks) 
notification from the RM, it notifies the Spark driver. The driver then 
performs the following actions:
1.    Identifies all the executors affected by the upcoming node loss.
2.    Moves all of the affected executors to a decommissioning state, and no 
new tasks are scheduled on these executors.
3.    Kills all the executors after reaching 50% of the termination time.
4.    Starts the failed tasks (if any) on other executors.
5.    For these nodes, it removes all the entries of the shuffle data from the 
map output tracker on driver after reaching 90% of the termination time. This 
helps in preventing the shuffle-fetch failures due to spot loss.
6.    Recomputes the shuffle data from the lost node by stage resubmission and 
at the time shuffles data of spot node if required."

  1.
  2.  So basically when a node fails classic spark comes into play and no new 
nodes are added etc (no rescaling) and tasks are redistributed among the 
existing executors as I read it?
 Error! Filename not specified.  view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 4 Feb 2022 at 13:55, Sean Owen 
<sro...@gmail.com<mailto:sro...@gmail.com>> wrote:
I have not seen stack traces under autoscaling, so not even sure what the error 
in question is.
There is always delay in acquiring a whole new executor in the cloud as it 
usually means a new VM is provisioned.
Spark treats the new executor like any other, available for executing tasks.

On Fri, Feb 4, 2022 at 4:28 AM Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote:
Thanks for the info.

My concern has always been on how Spark handles autoscaling (adding new 
executors) when the load pattern changes.I have tried to test this with setting 
the following parameters (Spark 3.1.2 on GCP)

        spark-submit --verbose \
        .......
          --conf spark.dynamicAllocation.enabled="true" \
           --conf spark.shuffle.service.enabled="true" \
           --conf spark.dynamicAllocation.minExecutors=2 \
           --conf spark.dynamicAllocation.maxExecutors=10 \
           --conf spark.dynamicAllocation.initialExecutors=4 \

It is not very clear to me how Spark distributes tasks on the added executors 
and the source of delay. As you have observed there is a delay in adding new 
resources and allocating tasks. If that process is efficient?

Thanks

 Error! Filename not specified.  view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 4 Feb 2022 at 03:04, Maksim Grinman 
<m...@resolute.ai<mailto:m...@resolute.ai>> wrote:
It's actually on AWS EMR. The job bootstraps and runs fine -- the autoscaling 
group is to bring up a service that spark will be calling. Some code waits for 
the autoscaling group to come up before continuing processing in Spark, since 
the Spark cluster will need to make requests to the service in the autoscaling 
group. It takes several minutes for the service to come up, and during the 
wait, Spark starts to show these thread dumps, as presumably it thinks 
something is wrong since the executor is busy waiting and not doing anything. 
The previous version of Spark did not do this (2.4.4).

On Thu, Feb 3, 2022 at 6:59 PM Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> wrote:
Sounds like you are running this on Google Dataproc cluster (spark 3.1.2)  with 
auto scaling policy?

 Can you describe if this happens before Spark starts a new job on the cluster 
or somehow half way through processing an existing job?

Also is the job involved doing Spark Structured Streaming?

HTH




 Error! Filename not specified.  view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 3 Feb 2022 at 21:29, Maksim Grinman 
<m...@resolute.ai<mailto:m...@resolute.ai>> wrote:
We've got a spark task that, after some processing, starts an autoscaling group 
and waits for it to be up before continuing processing. While waiting for the 
autoscaling group, spark starts throwing full thread dumps, presumably at the 
spark.executor.heartbeat interval. Is there a way to prevent the thread dumps?

--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI
--
Twitter: https://twitter.com/holdenkarau
Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9 
<https://amzn.to/2MaRAG9>
YouTube Live Streams: https://www.youtube.com/user/holdenkarau


--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI


--
Maksim Grinman
VP Engineering
Resolute AI

Reply via email to