[ 
https://issues.apache.org/jira/browse/KAFKA-9620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17047176#comment-17047176
 ] 

Sophie Blee-Goldman commented on KAFKA-9620:
--------------------------------------------

[~guozhang] I agree with your second observation, but w.r.t. your first point I 
think we don't swallow anything we shouldn't have and will throw TaskMigrated 
all the way up through poll (or would have, if onAssignment hadn't thrown its 
own RuntimeException).

If we do hit a TaskMigrated in a rebalance callback there's not much we can do, 
since the ConsumerCoordinator should still complete all callbacks. We just know 
they are doomed to also hit TaskMigrated and fail – all we can do is optimize 
for this case by preventing subsequent callbacks from attempting further 
close/create operations if any previous callback within the same poll call hit 
a TaskMigrated (or fatal) exception.

> Task revocation failure could introduce remaining unclean tasks
> ---------------------------------------------------------------
>
>                 Key: KAFKA-9620
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9620
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> The task revocation call should enforce the close of a task, otherwise we 
> could potentially hit the exception during `handleAssignment`.
> During revoke we failed:
>  
> {code:java}
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) [2020-02-27 
> 19:05:47,321] ERROR 
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1] 
> [Consumer 
> clientId=stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1-consumer,
>  groupId=stream-soak-test] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener failed 
> on invocation of onPartitionsRevoked for partitions [logs.json.kafka-2, 
> logs.json.zookeeper-2, node-name-repartition-1, logs.kubernetes-2, 
> windowed-node-counts-1, logs.operator-2, logs.syslog-2] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer get fenced 
> trying to commit a transaction; it means all tasks belonging to this thread 
> should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:172)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.commit(RecordCollectorImpl.java:226)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitState(StreamTask.java:368)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:242)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:314)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:72)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:297)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:383)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1277)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:920)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:800)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) Caused by: 
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
> operation with an old epoch. Either there is a newer producer with the same 
> transactionalId, or the producer's transaction has been expired by the broker.
> {code}
> During assignment we are checking the cleanness of task close and throw fatal:
> {code:java}
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) [2020-02-27 
> 19:05:48,032] ERROR 
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1] 
> stream-thread 
> [stream-soak-test-d1c291a8-ee54-4058-ac9c-7cd46d5484de-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread) 
> [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) 
> java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[0_2]]. 
> First exception (for task 0_2) follows.         at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:205)
>          at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1176)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:397)
>          at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>          at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
>          at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1277)
>          at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218) 
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:920)
>          at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:800)
>          at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
>          at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)
>  [2020-02-27T11:05:48-08:00] 
> (streams-soak-trunk-eos_soak_i-099dc04bf946ce2f0_streamslog) Caused by: 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer get fenced 
> trying to commit a transaction; it means all tasks belonging to this thread 
> should be migrated.
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to