[ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825278#comment-17825278
 ] 

Bruno Cadonna commented on KAFKA-16350:
---------------------------------------

Thanks for the report [~mjsax]!

I think the bug is in this line: 
https://github.com/apache/kafka/blob/fbbfafe1f556f424bf511697db6f399e5a622aa3/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L940

We do not re-initialize the task before we re-add it to the state updater.

I am currently working on a PR that will change that part of the code: 
https://github.com/apache/kafka/pull/15227. I will keep this bug in mind.


> StateUpdater does not init transaction after canceling task close action
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-16350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16350
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>         Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to