For #3, a better example would be in ConsumerCoordinator (around line 632).
commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception exception) {
FYI
On Mon, Feb 19, 2018 at 10:56 AM, Gabriel Giussi <[email protected]>
wrote:
> Hi Ted,
> my mistake was believe that commited offsets are used on the next poll, but
> is not the case
> <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/KafkaConsumer.java#L1202>
> .
>
> > The offsets committed using this API will be used on the first fetch
> after
> > every rebalance and also on startup
> >
>
> So, what to do after a failed commit depends on the nature of the exception
> I guess.
>
> - WakeupException: retry
> - Others: close consumer
>
> Thanks for your help to solve #2. I'm wondering about 1# and 3# yet.
>
> 2018-02-19 11:46 GMT-03:00 Ted Yu <[email protected]>:
>
> > For #2, I think the assumption is that the records are processed by the
> > loop:
> >
> > https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L164
> >
> >
> >
> > On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <[email protected]
> >
> > wrote:
> >
> > > Hi,
> > >
> > > I'm trying to use MockConsumer to test my application code but I've
> > faced a
> > > couple of limitations and I want to know if there are workarounds or
> > > something that I'm overlooking.
> > > Note: I'm using kafka-clients v 0.11.0.2
> > >
> > >
> > > 1. Why the addRecord
> > > <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L179>
> > > requires that the consumer has assigned partitions? Given that this
> is
> > > just
> > > simulating records being produced or existing records.
> > > 2. Why the poll
> > > <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L132>
> > > clear the map of records? It should not be cleared after commit?
> > > 3. Why the commitAsync
> > > <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L198>
> > > doesn't check for an exception and always succeed?
> > >
> > > Due to items (2) and (3) I'm not be able to test scenarios where the
> > > commits fails and the consumer should poll again the same elements.
> > >
> > > If someone knows about other scenarios that can't be tested with
> > > MockConsumer, please let me know.
> > >
> > > Thanks.
> > >
> >
>