On 10/13/2016 04:35 PM, Cody Koeninger wrote:
So I see in the logs that PIDRateEstimator is choosing a new rate, and
the rate it's choosing is 100.
But it's always choosing 100, while all the other variables change (processing 
time, latestRate, etc.) change.
Also, the records per batch is always the same despite the rate being 100.

That happens to be the default minimum of an (apparently undocumented) setting,


Try setting that to 1 and see if there's different behavior.
Same behavior. Always choose the same rate, the records per batch number does 
not change.

BTW, how many kafka partitions are you using, and how many actually
have data for a given batch?
3 partitions.
All of them have more than maxRatePerPartition records (my topic has hundred of 
millions of records).

On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane <s...@dindane.com> wrote:
Hey Cody,

Thanks for the reply. Really helpful.

Following your suggestion, I set spark.streaming.backpressure.enabled to
true and maxRatePerPartition to 100000.
I know I can handle 100k records at the same time, but definitely not in 1
second (the batchDuration), so I expect the backpressure to lower that

Unfortunately the backpressure doesn't work and I keep getting 100k records
per batch.

Here is my output log:
And this is my conf:

    conf.set("spark.streaming.kafka.consumer.poll.ms", "30000")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "100000")
    conf.set("spark.streaming.backpressure.enabled", "true")

That's not normal, is it? Do you notice anything odd in my logs?

Thanks a lot.

On 10/12/2016 07:31 PM, Cody Koeninger wrote:

Cool, just wanted to make sure.

To answer your question about

Isn't "spark.streaming.backpressure.initialRate" supposed to do this?

that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties


you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:


notice the cutover point

On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <s...@dindane.com> wrote:

I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.

On 10/12/2016 05:48 PM, Cody Koeninger wrote:

Just to make 100% sure, did you set


to true?

On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <s...@dindane.com> wrote:

On 10/12/2016 04:40 PM, Cody Koeninger wrote:

How would backpressure know anything about the capacity of your system
on the very first batch?

Isn't "spark.streaming.backpressure.initialRate" supposed to do this?

You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.

Backpressure doesn't scale even when using maxRatePerPartition: when I
enable backpressure and set maxRatePerPartition to n, I always get n
records, even if my batch takes longer than batchDuration to finish.

* I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
* I set backpressure.initialRate and/or maxRatePerPartition to 100,000
enable backpressure
* Since I can't handle 100,000 records in 1 second, I expect the
backpressure to kick in in the second batch, and get less than 100,000;
this does not happen

What am I missing here?

On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <s...@dindane.com>

That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is
That's better than nothing, but I'd be useful to have backpressure
for automatic scaling.

Do you have any idea about why aren't backpressure working? How to

On 10/11/2016 06:08 PM, Cody Koeninger wrote:


"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <s...@dindane.com>


Is it possible to limit the size of the batches returned by the
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions
and it takes ages to process and checkpoint them.

Thank you.


To unsubscribe e-mail: user-unsubscr...@spark.apache.org

To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to