I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is.
On Thu, May 28, 2015 at 2:21 PM, Andrew Or <and...@databricks.com> wrote: > Hi all, > > As the author of the dynamic allocation feature I can offer a few insights > here. > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and restart a stream the same way we kill and > relaunch executors. > > (2) Records come in every batch, and when there is data to process your > executors are not idle. If your idle timeout is less than the batch > duration, then you'll end up having to constantly kill and restart > executors. If your idle timeout is greater than the batch duration, then > you'll never kill executors. > > Long answer short, with Spark streaming there is currently no > straightforward way to scale the size of your cluster. I had a long > discussion with TD (Spark streaming lead) about what needs to be done to > provide some semblance of dynamic scaling to streaming applications, e.g. > take into account the batch queue instead. We came up with a few ideas that > I will not detail here, but we are looking into this and do intend to > support it in the near future. > > -Andrew > > > > 2015-05-28 8:02 GMT-07:00 Evo Eftimov <evo.efti...@isecc.com>: > > Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK >> – it will be your insurance policy against sys crashes due to memory leaks. >> Until there is free RAM, spark streaming (spark) will NOT resort to disk – >> and of course resorting to disk from time to time (ie when there is no free >> RAM ) and taking a performance hit from that, BUT only until there is no >> free RAM >> >> >> >> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] >> *Sent:* Thursday, May 28, 2015 2:34 PM >> *To:* Evo Eftimov >> *Cc:* Gerard Maas; spark users >> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic >> sizes/rate of growth in Kafka or Spark's metrics? >> >> >> >> Evo, good points. >> >> >> >> On the dynamic resource allocation, I'm surmising this only works within >> a particular cluster setup. So it improves the usage of current cluster >> resources but it doesn't make the cluster itself elastic. At least, that's >> my understanding. >> >> >> >> Memory + disk would be good and hopefully it'd take *huge* load on the >> system to start exhausting the disk space too. I'd guess that falling onto >> disk will make things significantly slower due to the extra I/O. >> >> >> >> Perhaps we'll really want all of these elements eventually. I think we'd >> want to start with memory only, keeping maxRate low enough not to overwhelm >> the consumers; implement the cluster autoscaling. We might experiment with >> dynamic resource allocation before we get to implement the cluster >> autoscale. >> >> >> >> >> >> >> >> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com> >> wrote: >> >> You can also try Dynamic Resource Allocation >> >> >> >> >> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation >> >> >> >> Also re the Feedback Loop for automatic message consumption rate >> adjustment – there is a “dumb” solution option – simply set the storage >> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets >> exhausted spark streaming will resort to keeping new RDDs on disk which >> will prevent it from crashing and hence loosing them. Then some memory will >> get freed and it will resort back to RAM and so on and so forth >> >> >> >> >> >> Sent from Samsung Mobile >> >> -------- Original message -------- >> >> From: Evo Eftimov >> >> Date:2015/05/28 13:22 (GMT+00:00) >> >> To: Dmitry Goldenberg >> >> Cc: Gerard Maas ,spark users >> >> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >> growth in Kafka or Spark's metrics? >> >> >> >> You can always spin new boxes in the background and bring them into the >> cluster fold when fully operational and time that with job relaunch and >> param change >> >> >> >> Kafka offsets are mabaged automatically for you by the kafka clients >> which keep them in zoomeeper dont worry about that ad long as you shut down >> your job gracefuly. Besides msnaging the offsets explicitly is not a big >> deal if necessary >> >> >> >> >> >> Sent from Samsung Mobile >> >> >> >> -------- Original message -------- >> >> From: Dmitry Goldenberg >> >> Date:2015/05/28 13:16 (GMT+00:00) >> >> To: Evo Eftimov >> >> Cc: Gerard Maas ,spark users >> >> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >> growth in Kafka or Spark's metrics? >> >> >> >> Thanks, Evo. Per the last part of your comment, it sounds like we will >> need to implement a job manager which will be in control of starting the >> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and >> marking them as ones to relaunch, scaling the cluster up/down by >> adding/removing machines, and relaunching the 'suspended' (shut down) jobs. >> >> >> >> I suspect that relaunching the jobs may be tricky since that means >> keeping track of the starter offsets in Kafka topic(s) from which the jobs >> started working on. >> >> >> >> Ideally, we'd want to avoid a re-launch. The 'suspension' and >> relaunching of jobs, coupled with the wait for the new machines to come >> online may turn out quite time-consuming which will make for lengthy >> request times, and our requests are not asynchronous. Ideally, the >> currently running jobs would continue to run on the machines currently >> available in the cluster. >> >> >> >> In the scale-down case, the job manager would want to signal to Spark's >> job scheduler not to send work to the node being taken out, find out when >> the last job has finished running on the node, then take the node out. >> >> >> >> This is somewhat like changing the number of cylinders in a car engine >> while the car is running... >> >> >> >> Sounds like a great candidate for a set of enhancements in Spark... >> >> >> >> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com> >> wrote: >> >> @DG; The key metrics should be >> >> >> >> - Scheduling delay – its ideal state is to remain constant over >> time and ideally be less than the time of the microbatch window >> >> - The average job processing time should remain less than the >> micro-batch window >> >> - Number of Lost Jobs – even if there is a single Job lost that >> means that you have lost all messages for the DStream RDD processed by that >> job due to the previously described spark streaming memory leak condition >> and subsequent crash – described in previous postings submitted by me >> >> >> >> You can even go one step further and periodically issue “get/check free >> memory” to see whether it is decreasing relentlessly at a constant rate – >> if it touches a predetermined RAM threshold that should be your third >> metric >> >> >> >> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and >> you can implement one on your own without waiting for Jiras and new >> features whenever they might be implemented by the Spark dev team – >> moreover you can avoid using slow mechanisms such as ZooKeeper and even >> incorporate some Machine Learning in your Feedback Loop to make it handle >> the message consumption rate more intelligently and benefit from ongoing >> online learning – BUT this is STILL about voluntarily sacrificing your >> performance in the name of keeping your system stable – it is not about >> scaling your system/solution >> >> >> >> In terms of how to scale the Spark Framework Dynamically – even though >> this is not supported at the moment out of the box I guess you can have a >> sys management framework spin dynamically a few more boxes (spark worker >> nodes), stop dynamically your currently running Spark Streaming Job, >> relaunch it with new params e.g. more Receivers, larger number of >> Partitions (hence tasks), more RAM per executor etc. Obviously this will >> cause some temporary delay in fact interruption in your processing but if >> the business use case can tolerate that then go for it >> >> >> >> *From:* Gerard Maas [mailto:gerard.m...@gmail.com] >> *Sent:* Thursday, May 28, 2015 12:36 PM >> *To:* dgoldenberg >> *Cc:* spark users >> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of >> growth in Kafka or Spark's metrics? >> >> >> >> Hi, >> >> >> >> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark >> streaming processes is not supported. >> >> >> >> >> >> *Longer version.* >> >> >> >> I assume that you are talking about Spark Streaming as the discussion is >> about handing Kafka streaming data. >> >> >> >> Then you have two things to consider: the Streaming receivers and the >> Spark processing cluster. >> >> >> >> Currently, the receiving topology is static. One receiver is allocated >> with each DStream instantiated and it will use 1 core in the cluster. Once >> the StreamingContext is started, this topology cannot be changed, therefore >> the number of Kafka receivers is fixed for the lifetime of your DStream. >> >> What we do is to calculate the cluster capacity and use that as a fixed >> upper bound (with a margin) for the receiver throughput. >> >> >> >> There's work in progress to add a reactive model to the receiver, where >> backpressure can be applied to handle overload conditions. See >> https://issues.apache.org/jira/browse/SPARK-7398 >> >> >> >> Once the data is received, it will be processed in a 'classical' Spark >> pipeline, so previous posts on spark resource scheduling might apply. >> >> >> >> Regarding metrics, the standard metrics subsystem of spark will report >> streaming job performance. Check the driver's metrics endpoint to peruse >> the available metrics: >> >> >> >> <driver>:<ui-port>/metrics/json >> >> >> >> -kr, Gerard. >> >> >> >> >> >> (*) Spark is a project that moves so fast that statements might be >> invalidated by new work every minute. >> >> >> >> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <dgoldenberg...@gmail.com> >> wrote: >> >> Hi, >> >> I'm trying to understand if there are design patterns for autoscaling >> Spark >> (add/remove slave machines to the cluster) based on the throughput. >> >> Assuming we can throttle Spark consumers, the respective Kafka topics we >> stream data from would start growing. What are some of the ways to >> generate >> the metrics on the number of new messages and the rate they are piling up? >> This perhaps is more of a Kafka question; I see a pretty sparse javadoc >> with >> the Metric interface and not much else... >> >> What are some of the ways to expand/contract the Spark cluster? Someone >> has >> mentioned Mesos... >> >> I see some info on Spark metrics in the Spark monitoring guide >> <https://spark.apache.org/docs/latest/monitoring.html> . Do we want to >> perhaps implement a custom sink that would help us autoscale up or down >> based on the throughput? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >> >> >> >> > >