[
https://issues.apache.org/jira/browse/KAFKA-9620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17047841#comment-17047841
]
Guozhang Wang commented on KAFKA-9620:
--------------------------------------
I synced with Boyang yesterday over the PR, and I think the root cause of 1) is
that we tries to not throw immediately from onPartitionsRevoked, but instead
always execute and remember exceptions through the 1) onPartitionsRevoked, 2)
onAssignment, 3) onPartitionsAssigned, and then throw. So it is actually not a
real problem but by design. The real problem is that, we should not wrap a
KafkaException (including TaskMigratedException) again as a RuntimeException
which is being fixed inside the PR.
> Task revocation failure could introduce remaining unclean tasks
> ---------------------------------------------------------------
>
> Key: KAFKA-9620
> URL: https://issues.apache.org/jira/browse/KAFKA-9620
> Project: Kafka
> Issue Type: Bug
> Reporter: Boyang Chen
> Assignee: Boyang Chen
> Priority: Major
>
> The task revocation call should enforce the close of a task, otherwise we
> could potentially hit the exception during `handleAssignment`.
> During revoke we failed:
>
> {code:java}
> [2020-02-27T11:05:48-08:00]
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) [2020-02-27
> 19:05:47,321] ERROR
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1]
> [Consumer
> clientId=stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1-consumer,
> groupId=stream-soak-test] User provided listener
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener failed
> on invocation of onPartitionsRevoked for partitions [logs.json.kafka-2,
> logs.json.zookeeper-2, node-name-repartition-1, logs.kubernetes-2,
> windowed-node-counts-1, logs.operator-2, logs.syslog-2]
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2020-02-27T11:05:48-08:00]
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog)
> org.apache.kafka.streams.errors.TaskMigratedException: Producer get fenced
> trying to commit a transaction; it means all tasks belonging to this thread
> should be migrated.
> at
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:172)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.commit(RecordCollectorImpl.java:226)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitState(StreamTask.java:368)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:242)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:314)
> at
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:72)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:297)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:383)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1277)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:920)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:800)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
> [2020-02-27T11:05:48-08:00]
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) Caused by:
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an
> operation with an old epoch. Either there is a newer producer with the same
> transactionalId, or the producer's transaction has been expired by the broker.
> {code}
> During assignment we are checking the cleanness of task close and throw fatal:
> {code:java}
> [2020-02-27T11:05:48-08:00]
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) [2020-02-27
> 19:05:48,032] ERROR
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1]
> stream-thread
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1]
> Encountered the following exception during processing and the thread is going
> to shut down: (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-02-27T11:05:48-08:00]
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog)
> java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[0_2]].
> First exception (for task 0_2) follows. at
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:205)
> at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1176)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:397)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1277)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:920)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:800)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
> [2020-02-27T11:05:48-08:00]
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) Caused by:
> org.apache.kafka.streams.errors.TaskMigratedException: Producer get fenced
> trying to commit a transaction; it means all tasks belonging to this thread
> should be migrated.
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)