Is there any recommendation about header max size?

2024-04-16 Thread Gabriel Giussi
I have logic in my service to capture exceptions being thrown during
message processing and produce a new message to a different topic with
information about the error. The idea is to leave the message unmodified,
aka produce the exact same bytes to this new topic, therefore I'm planning
on adding the java exception as a header.
By looking at the documentation it is just an array of bytes and it doesn't
say anything about a max size but is there any recommendation about it?
https://kafka.apache.org/documentation/#recordheader


Why kafka-transactions abort requires topic and partition?

2022-09-02 Thread Gabriel Giussi
I don't understand why the kafka-transactions cli requires a topic and
partition when a tx could be spawning not only across multiple partitions
but topics.
BTW, when I list the transactions with kafka-transactions list I'm getting
some with state EMPTY, this means the transaction was started by a producer
but didn't receive any record yet? otherwise it should be ongoing right?

Thanks.


Broker allows transactions with generation.id -1 and could lead to duplicates

2022-06-10 Thread Gabriel Giussi
I did the following test that allowed me to introduce a duplicate message
in the output topic.


1. Client A starts the consumer and the producer and holds a reference to
the current groupMetadata wich has generation.id -1 since the consumer
didn't join the group yet
2. Client A joins the group and gets assigned partition 0 and 1
3. Client A polls a message with offset X from partition 1, produces to
output topic and enters a long gc pause (before calling
sendOffsetsToTransation)
4. Client B starts the consumer and the producer, also getting a reference
to groupMetadata with generation.id -1
5. Client B joins the group and gets assigned partition 1
6. Client B polls a message with offset X from partition 1, produces to
output topic, sends offset with generation.id -1, and commits successfully.
7. Client A comes back and send offsets with generation.id -1 and commits
successfully

I did this test because it wasn't so clear for me at which moment I had to
get the meta and this seems to be a bug to me, since it shouldn't allow
sending offsets with generation.id -1.
I know that the right way to do it is to ask for the meta after each poll,
in that way we always have the generation.id corresponding to the moment
where the messages were polled from the broker, but it would be nice to
have an error if we send generation.id -1. WDYT?

Thanks.


Re: How it is safe to break message ordering but not idempotency after getting an OutOfOrderSequenceException?

2022-06-07 Thread Gabriel Giussi
Thanks for the answer Matthias.
I still have doubts about the meaning of "risks reordering of sent record".
If I understood correctly the example you gave is something like this
1. Producer sends batch with sequence number X
2. That request gets lost in the network
3. Producer sends batch with sequence number X+1
4. Broker receives batch with sequence number X+1 and returns an error and
the Producer throws a OutOfOrderSequenceException

In that situation we could keep retrying sending batch with sequence number
X+1 but we will keep getting a OutOfOrderSequenceException, or we ideally
also resend a batch with sequence number X, and after being accepted send
the one with X+1.
If what I'm saying is correct then I can't see how this can reorder the
messages, I mean if both batches include a message being written to topic
A, could messages from batch with sn X+1 end up being persisted with an
offset lesser than the ones from the batch with sn X?
Does this question make sense?

El mar, 7 jun 2022 a las 16:13, Matthias J. Sax ()
escribió:

> Yes, the broker de-dupes using the sequence number.
>
> But for example, if a sequence number is skipped, you could get this
> exception: the current batch of messages cannot be appended to the log,
> as one batch is missing, and the producer would need to re-send the
> previous/missing batch with lower sequence number before it can move to
> the "next" (ie current) batch.
>
> Does this make sense?
>
>
> -Matthias
>
> On 5/27/22 10:43 AM, Gabriel Giussi wrote:
> > The docs say
> > "This exception indicates that the broker received an unexpected sequence
> > number from the producer, which means that data may have been lost. If
> the
> > producer is configured for idempotence only (i.e. if enable.idempotence
> is
> > set and no transactional.id is configured), it is possible to continue
> > sending with the same producer instance, but doing so risks reordering of
> > sent record"
> >
> > Isn't the broker using the monotonically increasing sequence number to
> > dedup messages? So how can it break message ordering without breaking
> > idempotency?
> > I can't see an example scenario where this could happen, I guess
> > the OutOfOrderSequenceException can only happen
> > with max.in.flight.requests.per.connection > 1, but even in that case why
> > are not going to keep getting an OutOfOrderSequenceException but instead
> a
> > success that broke message ordering?
> >
> > Thanks.
> >
>


Re: What role plays transactional.id after KIP-447?

2022-06-02 Thread Gabriel Giussi
"I think we may overlooked it in documentation to emphasize that, in case
1), it should not expect ProducerFencedException. If so, we can fix the
javadoc."

IMHO that would be nice, I'm reviewing an existing codebase where we were
only handling ProducerFencedException, because the javadoc and the method
signature is explicit only about that, and CommitFailedException is not
even referenced but falls under the general KafkaException.
I think this could happen in both sendOffsetsToTransaction and
commitTransaction right?

Thanks.

El mar, 31 may 2022 a las 14:49, Guozhang Wang ()
escribió:

> The CommitFailedException should be expected, since the fencing happens at
> the consumer coordinator. I.e. we can only fence the consumer-producer pair
> by the consumer's generation, but we cannot do so since there's no other
> producer who has just grabbed the same txn.id and bumped the producer
> epoch.
>
> So to just clarify, when the zombie comes back, it could be fenced either
> when:
>
> 1) it tries to complete the ongoing transaction via `sendOffset`, in which
> it would see the CommitFailedException. The caller is then responsible to
> handle the thrown exception that indicates being fenced.
> 2) it tries to heartbeat in the background thread, and got an
> InvalidGeneration error code, in which it would trigger the
> onPartitionsLost. The callback impl class is then responsible to handle
> that case which indicates being fenced.
>
> I think we may overlooked it in documentation to emphasize that, in case
> 1), it should not expect ProducerFencedException. If so, we can fix the
> javadoc.
>
>
>
>
> On Tue, May 31, 2022 at 8:26 AM Gabriel Giussi 
> wrote:
>
> > But there is no guarantee that the onPartitionsLost callback will be
> called
> > before a zombie producer coming back to life tries to continue with the
> > transaction, e.g. sending offsets or committing, so I should handle the
> > exception first and I could directly create a new producer there instead
> of
> > doing in the callback.
> > The curious part for me is that I was able to reproduce a case that
> > simulates a zombie producer that will try to send offsets after a
> rebalance
> > but instead of failing with a ProducerFencedException is failing with a
> > CommitFailedException with this message "Transaction offset Commit failed
> > due to consumer group metadata mismatch: Specified group generation id is
> > not valid.", which makes sense but is not even documented in the
> > KafkaProducer#sendOffsetsToTransaction.
> > Is this the expected behaviour or it should fail with a
> > ProducerFencedException when the generation.id is outdated?
> > The case I reproduced is like this
> > 1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
> > producer with transactional.id = "tid.123"
> > 2. Consumes message from partition 1 and sends it to another thread to be
> > consumed (so the poll thread is not blocked)
> > 3. Producer A begins a transaction, sends to output topic and gets
> blocked
> > (I'm using a lock here to simulate a long processing) before calling
> > sendOffsetsToTransaction
> > 4. Consumer B is created and gets assigned partition 1 (I'm using
> > CooperativeStickyAssignor) and creates a producer with transactional.id
> =
> > "tid.456"
> > 5. Consumer B fetches the same message, processes it and commits the
> > transaction successfully
> > 6. Producer A calls sendOffsetsToTransaction (because the lock was
> > released) and fails with CommitFailedException
> >
> > This behaviour reflects what is described here
> >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> > ,
> > but I was actually expecting a ProducerFencedException instead. Does that
> > exception only correspond to fencing done by transactional.id?
> >
> > Thanks
> >
> > El mar, 24 may 2022 a las 20:30, Guozhang Wang ()
> > escribió:
> >
> > > No problem.
> > >
> > > The key is that at step 4, when the consumer re-joins it will be aware
> > that
> > > it has lost its previously assigned partitions and will trigger
> > > `onPartitionsLost` on the rebalance callback. And since in your
> scenario
> > > it's a 1-1 mapping from consumer to producer, it means the producer has
> > > been fenced and hence should be closed.
> > >
> > > So in that step 4, the old producer with Client A should be closed
> within
> > > the rebalance callback, and then one can create a new producer to pair
> > with
> > > the

Re: What role plays transactional.id after KIP-447?

2022-05-31 Thread Gabriel Giussi
But there is no guarantee that the onPartitionsLost callback will be called
before a zombie producer coming back to life tries to continue with the
transaction, e.g. sending offsets or committing, so I should handle the
exception first and I could directly create a new producer there instead of
doing in the callback.
The curious part for me is that I was able to reproduce a case that
simulates a zombie producer that will try to send offsets after a rebalance
but instead of failing with a ProducerFencedException is failing with a
CommitFailedException with this message "Transaction offset Commit failed
due to consumer group metadata mismatch: Specified group generation id is
not valid.", which makes sense but is not even documented in the
KafkaProducer#sendOffsetsToTransaction.
Is this the expected behaviour or it should fail with a
ProducerFencedException when the generation.id is outdated?
The case I reproduced is like this
1. Consumer A subscribes to topic X partitions 0 and 1, and starts a
producer with transactional.id = "tid.123"
2. Consumes message from partition 1 and sends it to another thread to be
consumed (so the poll thread is not blocked)
3. Producer A begins a transaction, sends to output topic and gets blocked
(I'm using a lock here to simulate a long processing) before calling
sendOffsetsToTransaction
4. Consumer B is created and gets assigned partition 1 (I'm using
CooperativeStickyAssignor) and creates a producer with transactional.id =
"tid.456"
5. Consumer B fetches the same message, processes it and commits the
transaction successfully
6. Producer A calls sendOffsetsToTransaction (because the lock was
released) and fails with CommitFailedException

This behaviour reflects what is described here
https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification,
but I was actually expecting a ProducerFencedException instead. Does that
exception only correspond to fencing done by transactional.id?

Thanks

El mar, 24 may 2022 a las 20:30, Guozhang Wang ()
escribió:

> No problem.
>
> The key is that at step 4, when the consumer re-joins it will be aware that
> it has lost its previously assigned partitions and will trigger
> `onPartitionsLost` on the rebalance callback. And since in your scenario
> it's a 1-1 mapping from consumer to producer, it means the producer has
> been fenced and hence should be closed.
>
> So in that step 4, the old producer with Client A should be closed within
> the rebalance callback, and then one can create a new producer to pair with
> the re-joined consumer.
>
> On Tue, May 24, 2022 at 1:30 PM Gabriel Giussi 
> wrote:
>
> > Last question, the fencing occurs with the sendOffsetsToTransaction which
> > includes ConsumerGroupMetadata, I guess the generation.id is what
> matters
> > here since it is bumped with each rebalance.
> > But couldn't this happen?
> > 1. Client A consumes from topic partition P1 with generation.id = 1 and
> a
> > producer associated to it produces to some output topic but a long GC
> pause
> > occurs before calling sendOffsetsToTransaction
> > 2. Client A gets out of sync and becomes a zombie due to session timeout,
> > group rebalanced.
> > 3. Client B is assigned topic partition P1 with generation.id = 2, calls
> > sendOffsetsToTransaction and commits the txn
> > 4. Client A is back online and joins again with generation.id = 3 (this
> > happens in some internal thread)
> > 5. The thread that was about to call sendOffsetsToTransaction is
> scheduled
> > and calls sendOffsetsToTransaction with generation.id = 3 which is the
> > current one so it won't be fenced.
> >
> > I'm asking this because we are always asking the current
> > consumerGroupMetadata to the consumer object, not the one that was used
> to
> > consume the offsets, like this
> > producer.sendOffsetsToTransaction(consumedOffsets,
> > consumer.groupMetadata());
> >
> > Couldn't this return a groupMetadata that has a valid generation.id even
> > when it is not the same at the moment of consuming the messages that are
> > about to be commited?
> >
> > I'm sure I'm missing something (probably in step 4) that makes this not a
> > possible scenario, but I can't say what it is.
> >
> > Sorry if the question is too confusing.
> >
> >
> >
> >
> >
> >
> > El mar, 24 may 2022 a las 12:49, Guozhang Wang ()
> > escribió:
> >
> > > Hi Gabriel,
> > >
> > > What I meant is that with KIP-447, the fencing is achieved by the time
> of
> > > committing with the consumer metadata. If within a transaction, the
> > > producer would always try to commit at least once on behalf of 

How it is safe to break message ordering but not idempotency after getting an OutOfOrderSequenceException?

2022-05-27 Thread Gabriel Giussi
The docs say
"This exception indicates that the broker received an unexpected sequence
number from the producer, which means that data may have been lost. If the
producer is configured for idempotence only (i.e. if enable.idempotence is
set and no transactional.id is configured), it is possible to continue
sending with the same producer instance, but doing so risks reordering of
sent record"

Isn't the broker using the monotonically increasing sequence number to
dedup messages? So how can it break message ordering without breaking
idempotency?
I can't see an example scenario where this could happen, I guess
the OutOfOrderSequenceException can only happen
with max.in.flight.requests.per.connection > 1, but even in that case why
are not going to keep getting an OutOfOrderSequenceException but instead a
success that broke message ordering?

Thanks.


Can CooperativeStickyAssignor be used with transactions?

2022-05-26 Thread Gabriel Giussi
This is the scenario I have in mind

1. Client A gets assigned partitions P1 and P2.
2. Client A polls a message with offset X from P1, opens a transaction and
produces to some output topic.
3. Client B joins the group and gets assigned P2
4. Client A tries to sendOffsets with group metadata but is fenced (due to
the rebalance that bumped the generation.id). At this moment I can't use
that producer anymore so I will just "ignore" that message, since the
broker will abort the transaction.
5. Client A polls again and will get the message with offset X + 1, since
that partition wasn't revoked due to the cooperative rebalance.

In an eager rebalance this can't happen since the rebalance that bumped the
generation.id and fenced my producer will also revoke the partitions and
force my consumer to fetch the offset again and poll the message with
offset X again.
I couldn't test this locally yet since isn't trivial, but could this happen
in theory or am I missing something?

Thanks.


Re: What role plays transactional.id after KIP-447?

2022-05-24 Thread Gabriel Giussi
Last question, the fencing occurs with the sendOffsetsToTransaction which
includes ConsumerGroupMetadata, I guess the generation.id is what matters
here since it is bumped with each rebalance.
But couldn't this happen?
1. Client A consumes from topic partition P1 with generation.id = 1 and a
producer associated to it produces to some output topic but a long GC pause
occurs before calling sendOffsetsToTransaction
2. Client A gets out of sync and becomes a zombie due to session timeout,
group rebalanced.
3. Client B is assigned topic partition P1 with generation.id = 2, calls
sendOffsetsToTransaction and commits the txn
4. Client A is back online and joins again with generation.id = 3 (this
happens in some internal thread)
5. The thread that was about to call sendOffsetsToTransaction is scheduled
and calls sendOffsetsToTransaction with generation.id = 3 which is the
current one so it won't be fenced.

I'm asking this because we are always asking the current
consumerGroupMetadata to the consumer object, not the one that was used to
consume the offsets, like this
producer.sendOffsetsToTransaction(consumedOffsets,
consumer.groupMetadata());

Couldn't this return a groupMetadata that has a valid generation.id even
when it is not the same at the moment of consuming the messages that are
about to be commited?

I'm sure I'm missing something (probably in step 4) that makes this not a
possible scenario, but I can't say what it is.

Sorry if the question is too confusing.






El mar, 24 may 2022 a las 12:49, Guozhang Wang ()
escribió:

> Hi Gabriel,
>
> What I meant is that with KIP-447, the fencing is achieved by the time of
> committing with the consumer metadata. If within a transaction, the
> producer would always try to commit at least once on behalf of the
> consumer, AND a zombie of the producer would always come from a zombie of a
> consumer, then the transaction would be guaranteed to be fenced. But:
>
> 1) If within a transaction, there's no `sendOffset..` triggered, then
> fencing still need to be done by the txn coordinator, and txn.id plays the
> role here  I think this is not your scenario.
> 2) If a consumer may be "represented" by multiple producers, and a zombie
> producer does not come from a zombie consumer, then we still need the
> fencing be done via the txn.id --- this is the scenario I'd like to remind
> you about. For example, if two producers could be (mistakenly) created with
> different txn.ids and are paired with the same consumer, then the new API
> in KIP-447 would not fence one of them.
>
> Guozhang
>
> On Tue, May 24, 2022 at 5:50 AM Gabriel Giussi 
> wrote:
>
> > Hello Guozhang,
> >
> > thanks for the response, I have some doubts about the "N-1
> > producer-consumer" case you mentioned and why I may need to configure the
> > transactional id there and how. Is this a case of N consumers sharing the
> > same producer right?
> >
> > My current implementation is creating a consumer per topic (I don't
> > subscribe to multiple topics from the same consumer) and starting a
> > producer per consumer, so the relation is 1 consumer/topic => 1 producer
> > and the transactional id is set as
> --.
> > Do you see any problem with this configuration?
> >
> > Thanks again.
> >
> > El sáb, 21 may 2022 a las 16:37, Guozhang Wang ()
> > escribió:
> >
> > > Hello Gabriel,
> > >
> > > What you're asking is a very fair question :) In fact, for Streams
> where
> > > the partition-assignment to producer-consumer pairs are purely
> flexible,
> > we
> > > think the new EOS would not have hard requirement on transactional.id:
> > > https://issues.apache.org/jira/browse/KAFKA-9453
> > >
> > > I you implemented the transactional messaging via a DIY
> producer+consumer
> > > though, it depends on how you'd expect the life-time of a producer,
> e.g.
> > if
> > > you do not have a 1-1 producer-consumer mapping then transactional.id
> is
> > > not crucial, but if your have a N-1 producer-consumer mapping then you
> > may
> > > still need to configure that id.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi <
> gabrielgiu...@gmail.com>
> > > wrote:
> > >
> > > > Before KIP-447 I understood the use of transactional.id to prevent
> us
> > > from
> > > > zombies introducing duplicates, as explained in this talk
> > > > https://youtu.be/j0l_zUhQaTc?t=822.
> > > > So in order to get zombie fencing working correctly we should assign
> > > > producers with a transactio

Re: What role plays transactional.id after KIP-447?

2022-05-24 Thread Gabriel Giussi
Hello Guozhang,

thanks for the response, I have some doubts about the "N-1
producer-consumer" case you mentioned and why I may need to configure the
transactional id there and how. Is this a case of N consumers sharing the
same producer right?

My current implementation is creating a consumer per topic (I don't
subscribe to multiple topics from the same consumer) and starting a
producer per consumer, so the relation is 1 consumer/topic => 1 producer
and the transactional id is set as  --.
Do you see any problem with this configuration?

Thanks again.

El sáb, 21 may 2022 a las 16:37, Guozhang Wang ()
escribió:

> Hello Gabriel,
>
> What you're asking is a very fair question :) In fact, for Streams where
> the partition-assignment to producer-consumer pairs are purely flexible, we
> think the new EOS would not have hard requirement on transactional.id:
> https://issues.apache.org/jira/browse/KAFKA-9453
>
> I you implemented the transactional messaging via a DIY producer+consumer
> though, it depends on how you'd expect the life-time of a producer, e.g. if
> you do not have a 1-1 producer-consumer mapping then transactional.id is
> not crucial, but if your have a N-1 producer-consumer mapping then you may
> still need to configure that id.
>
>
> Guozhang
>
>
>
> On Fri, May 20, 2022 at 8:39 AM Gabriel Giussi 
> wrote:
>
> > Before KIP-447 I understood the use of transactional.id to prevent us
> from
> > zombies introducing duplicates, as explained in this talk
> > https://youtu.be/j0l_zUhQaTc?t=822.
> > So in order to get zombie fencing working correctly we should assign
> > producers with a transactional.id that included the partition id,
> > something
> > like -, as shown in this slide
> > https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the
> same
> > txnl.id A as the process 1 that crashed.
> > This prevented us from having process 2 consuming the message again and
> > committing, while process 1 could come back to life and also commit the
> > pending transaction, hence having duplicates message being produced. In
> > this case process 1 will be fenced by having an outdated epoch.
> >
> > With KIP-447 we no longer have that potential scenario of two pending
> > transactions trying to produce and mark a message as committed, because
> we
> > won't let process 2 even start the transaction if there is a pending one
> > (basically by not returning any messages since we reject the Offset Fetch
> > if a there is a pending transaction for that offset partition). This is
> > explained in this post
> >
> >
> https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification
> >
> > Having that, I don't see anymore the value of transactional.id or how I
> > should configure it in my producers. The main benefit of KIP-447 is that
> we
> > no longer have to start one producer per input partition, a quote from
> the
> > post
> > "The only way the static assignment requirement could be met is if each
> > input partition uses a separate producer instance, which is in fact what
> > Kafka Streams previously relied on. However, this made running EOS
> > applications much more costly in terms of the client resources and load
> on
> > the brokers. A large number of client connections could heavily impact
> the
> > stability of brokers and become a waste of resources as well."
> >
> > I guess now I can reuse my producer between different input partitions,
> so
> > what transactional.id should I assign to it and why should I care, isn't
> > zombie fencing resolved by rejecting offset fetch already?
> >
> > Thanks.
> >
>
>
> --
> -- Guozhang
>


What role plays transactional.id after KIP-447?

2022-05-20 Thread Gabriel Giussi
Before KIP-447 I understood the use of transactional.id to prevent us from
zombies introducing duplicates, as explained in this talk
https://youtu.be/j0l_zUhQaTc?t=822.
So in order to get zombie fencing working correctly we should assign
producers with a transactional.id that included the partition id, something
like -, as shown in this slide
https://youtu.be/j0l_zUhQaTc?t=1047 where processor 2 should use the same
txnl.id A as the process 1 that crashed.
This prevented us from having process 2 consuming the message again and
committing, while process 1 could come back to life and also commit the
pending transaction, hence having duplicates message being produced. In
this case process 1 will be fenced by having an outdated epoch.

With KIP-447 we no longer have that potential scenario of two pending
transactions trying to produce and mark a message as committed, because we
won't let process 2 even start the transaction if there is a pending one
(basically by not returning any messages since we reject the Offset Fetch
if a there is a pending transaction for that offset partition). This is
explained in this post
https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/#client-api-simplification

Having that, I don't see anymore the value of transactional.id or how I
should configure it in my producers. The main benefit of KIP-447 is that we
no longer have to start one producer per input partition, a quote from the
post
"The only way the static assignment requirement could be met is if each
input partition uses a separate producer instance, which is in fact what
Kafka Streams previously relied on. However, this made running EOS
applications much more costly in terms of the client resources and load on
the brokers. A large number of client connections could heavily impact the
stability of brokers and become a waste of resources as well."

I guess now I can reuse my producer between different input partitions, so
what transactional.id should I assign to it and why should I care, isn't
zombie fencing resolved by rejecting offset fetch already?

Thanks.


Re: What causes partitions to be revoked?

2018-04-06 Thread Gabriel Giussi
Ok, the other thing that come to my mind is to check the
max.poll.interval.ms configuration. You said that "poll is invoked
regularly" but this isn't very specific.
The default value for max.poll.interval.ms is 5 minutes (30 millis) so
if you are executing a poll regularly each 6 minutes, you will see
rebalacing.


2018-04-05 19:01 GMT-03:00 Scott Thibault <scott.thiba...@multiscalehn.com>:

> No, there is only one consumer in the group.
>
>
> On Thu, Apr 5, 2018 at 2:39 PM, Gabriel Giussi <gabrielgiu...@gmail.com>
> wrote:
>
> > There is some other consumer (in the same process or another) using the
> > same group.id?
> >
> > 2018-04-05 14:36 GMT-03:00 Scott Thibault <scott.thibault@multiscalehn.
> com
> > >:
> >
> > > I'm using the Kafka 1.0.1 Java client with 1 consumer and 1 partition
> and
> > > using the ConsumerRebalanceListener I can see that the partition keeps
> > > getting revoked and then reassigned.  My consumer is in it's own thread
> > to
> > > ensure poll is invoked regularly.  Is there some other reason this
> might
> > be
> > > happening?
> > >
> >
>


Re: What causes partitions to be revoked?

2018-04-05 Thread Gabriel Giussi
There is some other consumer (in the same process or another) using the
same group.id?

2018-04-05 14:36 GMT-03:00 Scott Thibault :

> I'm using the Kafka 1.0.1 Java client with 1 consumer and 1 partition and
> using the ConsumerRebalanceListener I can see that the partition keeps
> getting revoked and then reassigned.  My consumer is in it's own thread to
> ensure poll is invoked regularly.  Is there some other reason this might be
> happening?
>


Closing a KafkaConsumer triggers a rebalance in KafkaConsumers subscribed to other topics. Is this the expected behavior?

2018-04-03 Thread Gabriel Giussi
Kafka brokers version: 0.11.0.0
Kafka client version: 0.11.0.2

If we have two KafkaConsumer using the same group.id (running in the same
process or in two different processes) and one of them is closed, it
triggers a rebalance in the other KafkaConsumer even if they were
subscribed to different topics.
I suppose that brokers must be taking into account only the group.id for a
rebalance and not the subscribed topics corresponding to the pair
(group_id,member_id) of the LeaveGroupRequest but I'm wondering if this is
the expected behavior or it's something that should be improved?

I guess that is probably the first option to avoid a more complex rebalance
in the broker and considering that the solution is very simple, i.e. just
use different group ids for different KafkaConsumer that subscribe to
different topics even if they are running in the same process.

Thanks.


Re: Testing with MockConsumer

2018-02-19 Thread Gabriel Giussi
Hi Ted,
my mistake was believe that commited offsets are used on the next poll, but
is not the case
<https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c14018cf7d3a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1202>
.

> The offsets committed using this API will be used on the first fetch after
> every rebalance and also on startup
>

So, what to do after a failed commit depends on the nature of the exception
I guess.

   - WakeupException: retry
   - Others: close consumer

Thanks for your help to solve #2. I'm wondering about 1# and 3# yet.

2018-02-19 11:46 GMT-03:00 Ted Yu <yuzhih...@gmail.com>:

> For #2, I think the assumption is that the records are processed by the
> loop:
>
> https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/MockConsumer.java#L164
>
>
>
> On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <gabrielgiu...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I'm trying to use MockConsumer to test my application code but I've
> faced a
> > couple of limitations and I want to know if there are workarounds or
> > something that I'm overlooking.
> > Note: I'm using kafka-clients v 0.11.0.2
> >
> >
> >1. Why the addRecord
> ><https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L179>
> >requires that the consumer has assigned partitions? Given that this is
> > just
> >simulating records being produced or existing records.
> >2. Why the poll
> ><https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L132>
> >clear the map of records? It should not be cleared after commit?
> >3. Why the commitAsync
> ><https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L198>
> >doesn't check for an exception and always succeed?
> >
> > Due to items (2) and (3) I'm not be able to test scenarios where the
> > commits fails and the consumer should poll again the same elements.
> >
> > If someone knows about other scenarios that can't be tested with
> > MockConsumer, please let me know.
> >
> > Thanks.
> >
>


Testing with MockConsumer

2018-02-19 Thread Gabriel Giussi
Hi,

I'm trying to use MockConsumer to test my application code but I've faced a
couple of limitations and I want to know if there are workarounds or
something that I'm overlooking.
Note: I'm using kafka-clients v 0.11.0.2


   1. Why the addRecord
   

   requires that the consumer has assigned partitions? Given that this is just
   simulating records being produced or existing records.
   2. Why the poll
   

   clear the map of records? It should not be cleared after commit?
   3. Why the commitAsync
   

   doesn't check for an exception and always succeed?

Due to items (2) and (3) I'm not be able to test scenarios where the
commits fails and the consumer should poll again the same elements.

If someone knows about other scenarios that can't be tested with
MockConsumer, please let me know.

Thanks.