Hi there

[ Running Kafka 0.10 and using Java api ]

Please could you confirm the subscription behaviour for the KafkaConsumer
when using a group ID.

I have two separate KafkaConsumer processes both using same groupID on
different topics A & B:

Process 1) Subscribes to Topic A with groupID = 123 and writes to topic B
Process 2) Subscribes to Topic B with groupID = 123

If I start Process 1 (Process 2 not yet running) it functions correctly,
consuming from Topic A and writing to topic B.

As soon as I start Process 2, and Process 1 subsequently attempts to commit
the offsets after say consuming 100 messages from Topic A, if fails to
commit the offsets with the CommitFailedException shown below.

It appears that as soon as process 2 subscribes with groupID = 123 on Topic
B, it kicks off process A's subscription (also on groupID=123 but on Topic
B); is the consumer coordinator responsible?

Incidentally, process 1 is committing the offsets at regular intervals even
if the offset has or hasn't moved on, or regardless of whether process 1
has consumed more msgs or hasn't and still performs a asyncCommit you get
the exception regardless.

My understanding was that if two separate consumers both subscribing to a
single but different topic used the same groupID, this would be ok?

Have I configured the Kafka / brokers incorrectly or is this correct
behaviour?

Btw, if I change process 2 to use a different groupID = 789 it's fine and
there is no rebalancing behaviour by the consumer cordinator.

Regards


Alex Gardner


Exception
========

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

            at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.
java:770)

            at org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.
java:716)

            at org.apache.kafka.clients.consumer.internals.
AbstractCoordinator$CoordinatorResponseHandler.
onSuccess(AbstractCoordinator.java:784)

            at org.apache.kafka.clients.consumer.internals.
AbstractCoordinator$CoordinatorResponseHandler.
onSuccess(AbstractCoordinator.java:765)

            at org.apache.kafka.clients.consumer.internals.
RequestFuture$1.onSuccess(RequestFuture.java:186)

            at org.apache.kafka.clients.consumer.internals.
RequestFuture.fireSuccess(RequestFuture.java:149)

            at org.apache.kafka.clients.consumer.internals.
RequestFuture.complete(RequestFuture.java:116)

            at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(
ConsumerNetworkClient.java:493)

            at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.firePendingCompletedRequests(
ConsumerNetworkClient.java:322)

            at org.apache.kafka.clients.consumer.internals.
ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)

            at org.apache.kafka.clients.consumer.KafkaConsumer.
pollOnce(KafkaConsumer.java:1047)

            at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
KafkaConsumer.java:995)

Reply via email to