Ok, thank you very much. To pause all assigned partitions should work for us, I will try it.
On Thu, Mar 31, 2016 at 12:32 PM, Manikumar Reddy <[email protected] > wrote: > Hi, > > 1. New config property "max.poll.records" is getting introduced in > upcoming 0.10 release. > This property can be used to control the no. of records in each poll. > > 2. We can use the combination of ExecutorService/Processing Thread and > Pause/Resume API to handle unwanted rebalances. > > Some of these options are discussed here > > http://users.kafka.apache.narkive.com/4vvhuBZO/low-latency-high-message-size-variance > > Example code is here > > https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java > > On Thu, Mar 31, 2016 at 3:43 PM, Daniel Fanjul < > [email protected]> wrote: > > > Hi all, > > > > My problem: If the consumer fetches too much data and the processing of > the > > records is not fast enough, commit() fails because there was a rebalance. > > > > I cannot reduce 'max.partition.fetch.bytes' because there might be large > > messages. > > > > I don't want to increase the 'session.timeout.ms', because it would be > too > > large to detect failures. > > > > I understand that the new consumer API only sends the heartbeats and > > manages rebalances during the call to poll(). But if I call poll(0), > there > > is still a chance it will return even more data. So I keep the heart > beats, > > but I may accumulate too much data, eventually leading to OOM. > > > > I would like something: > > foreach record in consumer.poll() { > > process(record) > > consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive() > > } > > > > Is this possible? > > >
