guozhangwang opened a new pull request #11372:
URL: https://github.com/apache/kafka/pull/11372


   We forbid sending produce request before adding partitions in the txn in the 
following way:
   
   * In `Sender#runOnce`, we would not continue to `sendProducerData` as long 
as `maybeSendAndPollTransactionalRequest()` returns true.
   * Inside `maybeSendAndPollTransactionalRequest` as long as 
`!newPartitionsInTransaction.isEmpty()` we would enqueue an 
`AddPartitionsToTxn` request to the queue, so in normal cases 
`maybeSendAndPollTransactionalRequest` would return `true` which would forbid 
us to send produce request in that iteration.
   * One thing that caught my eyes however, is that inside 
`transactionManager.nextRequest(accumulator.hasIncomplete())` where we call 
`maybeTerminateRequestWithError(nextRequestHandler)`, if the state is already 
`hasAbortableError()` we would fail the request immediately and hence the 
caller would return `null`, in which case 
`maybeSendAndPollTransactionalRequest` would return false.
   * At the `Sender#runOnce()`, we only check `if 
(transactionManager.hasFatalError())` and if yes we would not proceed to send 
produce requests.
   
   So I think there's a possible trace where we would send produce request 
before sending addPartitionsToTxn requests:
   
   1) First of all, the state of the transaction is already in `abortable 
error`.
   2) In `Sender#runOnce`, we would first pass 
`transactionManager.hasFatalError()` since we are not in `fatal error`
   3) And then in `maybeSendAndPollTransactionalRequest`, although we would 
generate an `addPartitionsToTxn` request, that request would fail immediately 
due to the `abortable error` state.
   4) And then we would pass `maybeSendAndPollTransactionalRequest` and 
continue to `sendProducerData` assuming the metadata for the destination 
brokers are available.
   
   In this case, a producer request could be sent out while the 
addPartitionsToTxn request is failed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to