I'm not sure that points 1 and 2 really apply to the kafka direct stream.
There are no receivers, and you know at the driver how big each of your
batches is.

On Thu, May 28, 2015 at 2:21 PM, Andrew Or <and...@databricks.com> wrote:

> Hi all,
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
> -Andrew
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov <evo.efti...@isecc.com>:
>
> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>> – it will be your insurance policy against sys crashes due to memory leaks.
>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>> and of course resorting to disk from time to time (ie when there is no free
>> RAM ) and taking a performance hit from that, BUT only until there is no
>> free RAM
>>
>>
>>
>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 2:34 PM
>> *To:* Evo Eftimov
>> *Cc:* Gerard Maas; spark users
>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>> sizes/rate of growth in Kafka or Spark's metrics?
>>
>>
>>
>> Evo, good points.
>>
>>
>>
>> On the dynamic resource allocation, I'm surmising this only works within
>> a particular cluster setup.  So it improves the usage of current cluster
>> resources but it doesn't make the cluster itself elastic. At least, that's
>> my understanding.
>>
>>
>>
>> Memory + disk would be good and hopefully it'd take *huge* load on the
>> system to start exhausting the disk space too.  I'd guess that falling onto
>> disk will make things significantly slower due to the extra I/O.
>>
>>
>>
>> Perhaps we'll really want all of these elements eventually.  I think we'd
>> want to start with memory only, keeping maxRate low enough not to overwhelm
>> the consumers; implement the cluster autoscaling.  We might experiment with
>> dynamic resource allocation before we get to implement the cluster
>> autoscale.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com>
>> wrote:
>>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Also re the Feedback Loop for automatic message consumption rate
>> adjustment – there is a “dumb” solution option – simply set the storage
>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>> exhausted spark streaming will resort to keeping new RDDs on disk which
>> will prevent it from crashing and hence loosing them. Then some memory will
>> get freed and it will resort back to RAM and so on and so forth
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>> -------- Original message --------
>>
>> From: Evo Eftimov
>>
>> Date:2015/05/28 13:22 (GMT+00:00)
>>
>> To: Dmitry Goldenberg
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> You can always spin new boxes in the background and bring them into the
>> cluster fold when fully operational and time that with job relaunch and
>> param change
>>
>>
>>
>> Kafka offsets are mabaged automatically for you by the kafka clients
>> which keep them in zoomeeper dont worry about that ad long as you shut down
>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>> deal if necessary
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>
>>
>> -------- Original message --------
>>
>> From: Dmitry Goldenberg
>>
>> Date:2015/05/28 13:16 (GMT+00:00)
>>
>> To: Evo Eftimov
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> Thanks, Evo.  Per the last part of your comment, it sounds like we will
>> need to implement a job manager which will be in control of starting the
>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
>> marking them as ones to relaunch, scaling the cluster up/down by
>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>>
>>
>>
>> I suspect that relaunching the jobs may be tricky since that means
>> keeping track of the starter offsets in Kafka topic(s) from which the jobs
>> started working on.
>>
>>
>>
>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>> relaunching of jobs, coupled with the wait for the new machines to come
>> online may turn out quite time-consuming which will make for lengthy
>> request times, and our requests are not asynchronous.  Ideally, the
>> currently running jobs would continue to run on the machines currently
>> available in the cluster.
>>
>>
>>
>> In the scale-down case, the job manager would want to signal to Spark's
>> job scheduler not to send work to the node being taken out, find out when
>> the last job has finished running on the node, then take the node out.
>>
>>
>>
>> This is somewhat like changing the number of cylinders in a car engine
>> while the car is running...
>>
>>
>>
>> Sounds like a great candidate for a set of enhancements in Spark...
>>
>>
>>
>> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com>
>> wrote:
>>
>> @DG; The key metrics should be
>>
>>
>>
>> -          Scheduling delay – its ideal state is to remain constant over
>> time and ideally be less than the time of the microbatch window
>>
>> -          The average job processing time should remain less than the
>> micro-batch window
>>
>> -          Number of Lost Jobs – even if there is a single Job lost that
>> means that you have lost all messages for the DStream RDD processed by that
>> job due to the previously described spark streaming memory leak condition
>> and subsequent crash – described in previous postings submitted by me
>>
>>
>>
>> You can even go one step further and periodically issue “get/check free
>> memory” to see whether it is decreasing relentlessly at a constant rate –
>> if it touches a predetermined RAM threshold that should be your third
>> metric
>>
>>
>>
>> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
>> you can implement one on your own without waiting for Jiras and new
>> features whenever they might be implemented by the Spark dev team –
>> moreover you can avoid using slow mechanisms such as ZooKeeper and even
>> incorporate some Machine Learning in your Feedback Loop to make it handle
>> the message consumption rate more intelligently and benefit from ongoing
>> online learning – BUT this is STILL about voluntarily sacrificing your
>> performance in the name of keeping your system stable – it is not about
>> scaling your system/solution
>>
>>
>>
>> In terms of how to scale the Spark Framework Dynamically – even though
>> this is not supported at the moment out of the box I guess you can have a
>> sys management framework spin dynamically a few more boxes (spark worker
>> nodes), stop dynamically your currently running Spark Streaming Job,
>> relaunch it with new params e.g. more Receivers, larger number of
>> Partitions (hence tasks), more RAM per executor etc. Obviously this will
>> cause some temporary delay in fact interruption in your processing but if
>> the business use case can tolerate that then go for it
>>
>>
>>
>> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 12:36 PM
>> *To:* dgoldenberg
>> *Cc:* spark users
>> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> Hi,
>>
>>
>>
>> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
>> streaming processes is not supported.
>>
>>
>>
>>
>>
>> *Longer version.*
>>
>>
>>
>> I assume that you are talking about Spark Streaming as the discussion is
>> about handing Kafka streaming data.
>>
>>
>>
>> Then you have two things to consider: the Streaming receivers and the
>> Spark processing cluster.
>>
>>
>>
>> Currently, the receiving topology is static. One receiver is allocated
>> with each DStream instantiated and it will use 1 core in the cluster. Once
>> the StreamingContext is started, this topology cannot be changed, therefore
>> the number of Kafka receivers is fixed for the lifetime of your DStream.
>>
>> What we do is to calculate the cluster capacity and use that as a fixed
>> upper bound (with a margin) for the receiver throughput.
>>
>>
>>
>> There's work in progress to add a reactive model to the receiver, where
>> backpressure can be applied to handle overload conditions. See
>> https://issues.apache.org/jira/browse/SPARK-7398
>>
>>
>>
>> Once the data is received, it will be processed in a 'classical' Spark
>> pipeline, so previous posts on spark resource scheduling might apply.
>>
>>
>>
>> Regarding metrics, the standard metrics subsystem of spark will report
>> streaming job performance. Check the driver's metrics endpoint to peruse
>> the available metrics:
>>
>>
>>
>> <driver>:<ui-port>/metrics/json
>>
>>
>>
>> -kr, Gerard.
>>
>>
>>
>>
>>
>> (*) Spark is a project that moves so fast that statements might be
>> invalidated by new work every minute.
>>
>>
>>
>> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <dgoldenberg...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I'm trying to understand if there are design patterns for autoscaling
>> Spark
>> (add/remove slave machines to the cluster) based on the throughput.
>>
>> Assuming we can throttle Spark consumers, the respective Kafka topics we
>> stream data from would start growing.  What are some of the ways to
>> generate
>> the metrics on the number of new messages and the rate they are piling up?
>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
>> with
>> the Metric interface and not much else...
>>
>> What are some of the ways to expand/contract the Spark cluster? Someone
>> has
>> mentioned Mesos...
>>
>> I see some info on Spark metrics in  the Spark monitoring guide
>> <https://spark.apache.org/docs/latest/monitoring.html>  .  Do we want to
>> perhaps implement a custom sink that would help us autoscale up or down
>> based on the throughput?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>>
>>
>
>

Reply via email to