Keyboard error...

something along these lines:

records = consumer.poll()
foreach record:
  process record
  add to commit map
  if records processed > threshold:
    commit map

Take care to make sure everything has been committed before calling poll
again because it would cause the driver to skip over the uncommitted
records from the previous poll.

Martin


On 31 March 2016 at 16:10, Martin Skøtt <[email protected]>
wrote:

> We have recently had great success with committing records in smaller
> batches between poll()'s. Something along these lines:
>
> records = consumer.poll()
> foreach record:
> process record
>
>
>
>
>
> On 31 March 2016 at 12:13, Daniel Fanjul <[email protected]>
> wrote:
>
>> Hi all,
>>
>> My problem: If the consumer fetches too much data and the processing of
>> the
>> records is not fast enough, commit() fails because there was a rebalance.
>>
>> I cannot reduce 'max.partition.fetch.bytes' because there might be large
>> messages.
>>
>> I don't want to increase the 'session.timeout.ms', because it would be
>> too
>> large to detect failures.
>>
>> I understand that the new consumer API only sends the heartbeats and
>> manages rebalances during the call to poll(). But if I call poll(0), there
>> is still a chance it will return even more data. So I keep the heart
>> beats,
>> but I may accumulate too much data, eventually leading to OOM.
>>
>> I would like something:
>> foreach record in consumer.poll() {
>>   process(record)
>>   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
>> }
>>
>> Is this possible?
>>
>
>

Reply via email to