[ 
https://issues.apache.org/jira/browse/KAFKA-9620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17047910#comment-17047910
 ] 

ASF GitHub Bot commented on KAFKA-9620:
---------------------------------------

guozhangwang commented on pull request #8187: KAFKA-9620: Do not throw in the 
middle of consumer user callbacks
URL: https://github.com/apache/kafka/pull/8187
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Task revocation failure could introduce remaining unclean tasks
> ---------------------------------------------------------------
>
>                 Key: KAFKA-9620
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9620
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> The task revocation call should enforce the close of a task, otherwise we 
> could potentially hit the exception during `handleAssignment`.
> During revoke we failed:
>  
> {code:java}
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) [2020-02-27 
> 19:05:47,321] ERROR 
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1] 
> [Consumer 
> clientId=stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1-consumer,
>  groupId=stream-soak-test] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener failed 
> on invocation of onPartitionsRevoked for partitions [logs.json.kafka-2, 
> logs.json.zookeeper-2, node-name-repartition-1, logs.kubernetes-2, 
> windowed-node-counts-1, logs.operator-2, logs.syslog-2] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer get fenced 
> trying to commit a transaction; it means all tasks belonging to this thread 
> should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:172)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.commit(RecordCollectorImpl.java:226)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitState(StreamTask.java:368)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:242)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:314)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:72)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:297)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:383)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1277)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:920)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:800)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) Caused by: 
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
> operation with an old epoch. Either there is a newer producer with the same 
> transactionalId, or the producer's transaction has been expired by the broker.
> {code}
> During assignment we are checking the cleanness of task close and throw fatal:
> {code:java}
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) [2020-02-27 
> 19:05:48,032] ERROR 
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1] 
> stream-thread 
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread) 
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) 
> java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[0_2]]. 
> First exception (for task 0_2) follows.         at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:205)
>          at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1176)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:397)
>          at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>          at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
>          at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1277)
>          at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) 
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:920)
>          at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:800)
>          at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
>          at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
>  [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) Caused by: 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer get fenced 
> trying to commit a transaction; it means all tasks belonging to this thread 
> should be migrated.
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to