Ok, thank you very much. To pause all assigned partitions should work for
us, I will try it.

On Thu, Mar 31, 2016 at 12:32 PM, Manikumar Reddy <[email protected]
> wrote:

> Hi,
>
> 1. New config property "max.poll.records"  is getting  introduced in
> upcoming 0.10 release.
>    This property can be used to control the no. of records in each poll.
>
> 2.  We can use the combination of ExecutorService/Processing Thread and
> Pause/Resume API to handle unwanted rebalances.
>
> Some of these options are discussed here
>
> http://users.kafka.apache.narkive.com/4vvhuBZO/low-latency-high-message-size-variance
>
> Example code is here
>
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java
>
> On Thu, Mar 31, 2016 at 3:43 PM, Daniel Fanjul <
> [email protected]> wrote:
>
> > Hi all,
> >
> > My problem: If the consumer fetches too much data and the processing of
> the
> > records is not fast enough, commit() fails because there was a rebalance.
> >
> > I cannot reduce 'max.partition.fetch.bytes' because there might be large
> > messages.
> >
> > I don't want to increase the 'session.timeout.ms', because it would be
> too
> > large to detect failures.
> >
> > I understand that the new consumer API only sends the heartbeats and
> > manages rebalances during the call to poll(). But if I call poll(0),
> there
> > is still a chance it will return even more data. So I keep the heart
> beats,
> > but I may accumulate too much data, eventually leading to OOM.
> >
> > I would like something:
> > foreach record in consumer.poll() {
> >   process(record)
> >   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
> > }
> >
> > Is this possible?
> >
>

Reply via email to