Keyboard error...
something along these lines:
records = consumer.poll()
foreach record:
process record
add to commit map
if records processed > threshold:
commit map
Take care to make sure everything has been committed before calling poll
again because it would cause the driver to skip over the uncommitted
records from the previous poll.
Martin
On 31 March 2016 at 16:10, Martin Skøtt <[email protected]>
wrote:
> We have recently had great success with committing records in smaller
> batches between poll()'s. Something along these lines:
>
> records = consumer.poll()
> foreach record:
> process record
>
>
>
>
>
> On 31 March 2016 at 12:13, Daniel Fanjul <[email protected]>
> wrote:
>
>> Hi all,
>>
>> My problem: If the consumer fetches too much data and the processing of
>> the
>> records is not fast enough, commit() fails because there was a rebalance.
>>
>> I cannot reduce 'max.partition.fetch.bytes' because there might be large
>> messages.
>>
>> I don't want to increase the 'session.timeout.ms', because it would be
>> too
>> large to detect failures.
>>
>> I understand that the new consumer API only sends the heartbeats and
>> manages rebalances during the call to poll(). But if I call poll(0), there
>> is still a chance it will return even more data. So I keep the heart
>> beats,
>> but I may accumulate too much data, eventually leading to OOM.
>>
>> I would like something:
>> foreach record in consumer.poll() {
>> process(record)
>> consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
>> }
>>
>> Is this possible?
>>
>
>