Hey Cody,

Thanks for the reply. Really helpful.

Following your suggestion, I set spark.streaming.backpressure.enabled to true 
and maxRatePerPartition to 100000.
I know I can handle 100k records at the same time, but definitely not in 1 
second (the batchDuration), so I expect the backpressure to lower that number.

Unfortunately the backpressure doesn't work and I keep getting 100k records per 

Here is my output log: 
And this is my conf:

    conf.set("spark.streaming.kafka.consumer.poll.ms", "30000")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "100000")
    conf.set("spark.streaming.backpressure.enabled", "true")

That's not normal, is it? Do you notice anything odd in my logs?

Thanks a lot.

On 10/12/2016 07:31 PM, Cody Koeninger wrote:
Cool, just wanted to make sure.

To answer your question about

Isn't "spark.streaming.backpressure.initialRate" supposed to do this?

that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties


you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:


notice the cutover point

On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <s...@dindane.com> wrote:
I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.

On 10/12/2016 05:48 PM, Cody Koeninger wrote:

Just to make 100% sure, did you set


to true?

On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <s...@dindane.com> wrote:

On 10/12/2016 04:40 PM, Cody Koeninger wrote:

How would backpressure know anything about the capacity of your system
on the very first batch?

Isn't "spark.streaming.backpressure.initialRate" supposed to do this?

You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.

Backpressure doesn't scale even when using maxRatePerPartition: when I
enable backpressure and set maxRatePerPartition to n, I always get n
records, even if my batch takes longer than batchDuration to finish.

* I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
* I set backpressure.initialRate and/or maxRatePerPartition to 100,000
enable backpressure
* Since I can't handle 100,000 records in 1 second, I expect the
backpressure to kick in in the second batch, and get less than 100,000;
this does not happen

What am I missing here?

On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <s...@dindane.com> wrote:

That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is
That's better than nothing, but I'd be useful to have backpressure
for automatic scaling.

Do you have any idea about why aren't backpressure working? How to

On 10/11/2016 06:08 PM, Cody Koeninger wrote:


"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <s...@dindane.com>


Is it possible to limit the size of the batches returned by the Kafka
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions of
and it takes ages to process and checkpoint them.

Thank you.


To unsubscribe e-mail: user-unsubscr...@spark.apache.org

To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to