Cool, just wanted to make sure. To answer your question about
> Isn't "spark.streaming.backpressure.initialRate" supposed to do this? that configuration was added well after the integration of the direct stream with the backpressure code, and was added only to the receiver code, which the direct stream doesn't share since it isn't a receiver. Not making excuses about it being confusing, just explaining how things ended up that way :( So yeah, maxRatePerPartition is the closest thing you have on the direct stream side to being able to limit before the backpressure estimator has something to work with. So to try and debug what you're seeing, if you add a line like this to your log4j.properties log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE you should start seeing log lines like 16/10/12 12:18:01 TRACE PIDRateEstimator: time = 1476292681092, # records = 20, processing time = 20949, scheduling delay = 6 16/10/12 12:18:01 TRACE PIDRateEstimator: latestRate = -1.0, error = -1.9546995083297531 latestError = -1.0, historicalError = 0.001145639409995704 delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10 and then once it updates, lines like 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0 For a really artificially constrained example where maxRatePerPartition is set such that it limits to 20 per batch but the system can really only handle 5 per batch, the streaming UI will look something like this: https://i.imgsafe.org/e730492453.png notice the cutover point On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <s...@dindane.com> wrote: > I am 100% sure. > > println(conf.get("spark.streaming.backpressure.enabled")) prints true. > > > On 10/12/2016 05:48 PM, Cody Koeninger wrote: >> >> Just to make 100% sure, did you set >> >> spark.streaming.backpressure.enabled >> >> to true? >> >> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <s...@dindane.com> wrote: >>> >>> >>> >>> On 10/12/2016 04:40 PM, Cody Koeninger wrote: >>>> >>>> >>>> How would backpressure know anything about the capacity of your system >>>> on the very first batch? >>> >>> >>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this? >>>> >>>> >>>> >>>> You should be able to set maxRatePerPartition at a value that makes >>>> sure your first batch doesn't blow things up, and let backpressure >>>> scale from there. >>> >>> >>> Backpressure doesn't scale even when using maxRatePerPartition: when I >>> enable backpressure and set maxRatePerPartition to n, I always get n >>> records, even if my batch takes longer than batchDuration to finish. >>> >>> Example: >>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf, >>> Durations.seconds(1))` >>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 >>> and >>> enable backpressure >>> * Since I can't handle 100,000 records in 1 second, I expect the >>> backpressure to kick in in the second batch, and get less than 100,000; >>> but >>> this does not happen >>> >>> What am I missing here? >>> >>> >>> >>>> >>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <s...@dindane.com> wrote: >>>>> >>>>> >>>>> That's what I was looking for, thank you. >>>>> >>>>> Unfortunately, neither >>>>> >>>>> * spark.streaming.backpressure.initialRate >>>>> * spark.streaming.backpressure.enabled >>>>> * spark.streaming.receiver.maxRate >>>>> * spark.streaming.receiver.initialRate >>>>> >>>>> change how many records I get (I tried many different combinations). >>>>> >>>>> The only configuration that works is >>>>> "spark.streaming.kafka.maxRatePerPartition". >>>>> That's better than nothing, but I'd be useful to have backpressure >>>>> enabled >>>>> for automatic scaling. >>>>> >>>>> Do you have any idea about why aren't backpressure working? How to >>>>> debug >>>>> this? >>>>> >>>>> >>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote: >>>>>> >>>>>> >>>>>> >>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>> >>>>>> "This rate is upper bounded by the values >>>>>> spark.streaming.receiver.maxRate and >>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see >>>>>> below)." >>>>>> >>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <s...@dindane.com> >>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Is it possible to limit the size of the batches returned by the Kafka >>>>>>> consumer for Spark Streaming? >>>>>>> I am asking because the first batch I get has hundred of millions of >>>>>>> records >>>>>>> and it takes ages to process and checkpoint them. >>>>>>> >>>>>>> Thank you. >>>>>>> >>>>>>> Samy >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>> >>>>> >>> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org