Makes sense especially if you have a cloud with “infinite” resources / nodes 
which allows you to double, triple etc in the background/parallel the resources 
of the currently running cluster 

 

I was thinking more about the scenario where you have e.g. 100 boxes and want 
to / can add e.g. 20 more 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:46 PM
To: Evo Eftimov
Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo,

 

One of the ideas is to shadow the current cluster. This way there's no extra 
latency incurred due to shutting down of the consumers. If two sets of 
consumers are running, potentially processing the same data, that is OK. We 
phase out the older cluster and gradually flip over to the new one, insuring no 
downtime or extra latency.  Thoughts?

 

On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov <evo.efti...@isecc.com> wrote:

You should monitor vital performance / job clogging stats of the Spark 
Streaming Runtime not “kafka topics”

 

You should be able to bring new worker nodes online and make them contact and 
register with the Master without bringing down the Master (or any of the 
currently running worker nodes) 

 

Then just shutdown your currently running spark streaming job/app and restart 
it with new params to take advantage of the larger cluster 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:14 PM
To: Cody Koeninger
Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Would it be possible to implement Spark autoscaling somewhat along these lines? 
--

 

1. If we sense that a new machine is needed, by watching the data load in Kafka 
topic(s), then

2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and 
get a machine);

3. Create a "shadow"/mirror Spark master running alongside the initial version 
which talks to N machines. The new mirror version is aware of N+1 machines (or 
N+M if we had decided we needed M new boxes).

4. The previous version of the Spark runtime is acquiesced/decommissioned.  We 
possibly get both clusters working on the same data which may actually be OK 
(at least for our specific use-cases).

5. Now the new Spark cluster is running.

 

Similarly, the decommissioning of M unused boxes would happen, via this notion 
of a mirror Spark runtime.  How feasible would it be for such a mirrorlike 
setup to be created, especially created programmatically?  Especially point #3.

 

The other idea we'd entertained was to bring in a new machine, acquiesce down 
all currently running workers by telling them to process their current batch 
then shut down, then restart the consumers now that Spark is aware of a 
modified cluster.  This has the drawback of a downtime that may not be 
tolerable in terms of latency, by the system's clients waiting for their 
responses in a synchronous fashion.

 

Thanks.

 

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger <c...@koeninger.org> wrote:

I'm not sure that points 1 and 2 really apply to the kafka direct stream.  
There are no receivers, and you know at the driver how big each of your batches 
is.

 

On Thu, May 28, 2015 at 2:21 PM, Andrew Or <and...@databricks.com> wrote:

Hi all,

 

As the author of the dynamic allocation feature I can offer a few insights here.

 

Gerard's explanation was both correct and concise: dynamic allocation is not 
intended to be used in Spark streaming at the moment (1.4 or before). This is 
because of two things:

 

(1) Number of receivers is necessarily fixed, and these are started in 
executors. Since we need a receiver for each InputDStream, if we kill these 
receivers we essentially stop the stream, which is not what we want. It makes 
little sense to close and restart a stream the same way we kill and relaunch 
executors.

 

(2) Records come in every batch, and when there is data to process your 
executors are not idle. If your idle timeout is less than the batch duration, 
then you'll end up having to constantly kill and restart executors. If your 
idle timeout is greater than the batch duration, then you'll never kill 
executors.

 

Long answer short, with Spark streaming there is currently no straightforward 
way to scale the size of your cluster. I had a long discussion with TD (Spark 
streaming lead) about what needs to be done to provide some semblance of 
dynamic scaling to streaming applications, e.g. take into account the batch 
queue instead. We came up with a few ideas that I will not detail here, but we 
are looking into this and do intend to support it in the near future.

 

-Andrew

 

 

 

2015-05-28 8:02 GMT-07:00 Evo Eftimov <evo.efti...@isecc.com>:

 

Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be your insurance policy against sys crashes due to memory leaks. Until 
there is free RAM, spark streaming (spark) will NOT resort to disk – and of 
course resorting to disk from time to time (ie when there is no free RAM ) and 
taking a performance hit from that, BUT only until there is no free RAM 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, May 28, 2015 2:34 PM
To: Evo Eftimov
Cc: Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo, good points.

 

On the dynamic resource allocation, I'm surmising this only works within a 
particular cluster setup.  So it improves the usage of current cluster 
resources but it doesn't make the cluster itself elastic. At least, that's my 
understanding.

 

Memory + disk would be good and hopefully it'd take *huge* load on the system 
to start exhausting the disk space too.  I'd guess that falling onto disk will 
make things significantly slower due to the extra I/O.

 

Perhaps we'll really want all of these elements eventually.  I think we'd want 
to start with memory only, keeping maxRate low enough not to overwhelm the 
consumers; implement the cluster autoscaling.  We might experiment with dynamic 
resource allocation before we get to implement the cluster autoscale.

 

 

 

On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com> wrote:

You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

-------- Original message --------

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

-------- Original message --------

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com> wrote:

@DG; The key metrics should be

 

-          Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-          The average job processing time should remain less than the 
micro-batch window

-          Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fixed upper 
bound (with a margin) for the receiver throughput.

 

There's work in progress to add a reactive model to the receiver, where 
backpressure can be applied to handle overload conditions. See 
https://issues.apache.org/jira/browse/SPARK-7398

 

Once the data is received, it will be processed in a 'classical' Spark 
pipeline, so previous posts on spark resource scheduling might apply.

 

Regarding metrics, the standard metrics subsystem of spark will report 
streaming job performance. Check the driver's metrics endpoint to peruse the 
available metrics:

 

<driver>:<ui-port>/metrics/json

 

-kr, Gerard.

 

 

(*) Spark is a project that moves so fast that statements might be invalidated 
by new work every minute.

 

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <dgoldenberg...@gmail.com> wrote:

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
<https://spark.apache.org/docs/latest/monitoring.html>  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 

 

 

 

 

 

Reply via email to