[ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136215#comment-17136215
 ] 

Guozhang Wang commented on KAFKA-10167:
---------------------------------------

The proposed solution is that even under EOS, do not try to use 
consumer.endOffset that would set `read-committed` flag, but to just use 
list-offset with `read-uncommitted` to get the end-offset.

The rationale is that, since we know that this changelog-topic is a 
single-writer, single-reader, and we control all the writer / reader of it, we 
can safely assume that the on-going txn is only from our previous writer. 

If the task migration is due to a graceful rebalance (i.e. the task is indeed 
being revoked from the other host), then the old host would always commit in 
which it would block on `producer.flush` to make sure all data are written 
(although by default we do not override replication factor on changelog topics 
and producer's ack.mode, so if user change the one without the other they may 
bump into other issues where data are not replicated completely and hence 
high-watermark returned from list-offset can be smaller). And therefore the 
end-offset returned would return the actual log-end-offset with or without the 
txn-marker, either of which is fine.

If the task migration is due to an unexpected task migration (i.e. the task was 
not proactively revoked, the old host may not know it is out of the group or 
has been crashed), then although not all records sent from the old host are 
guaranteed to be on the broker and be covered with end-offset, it is fine since 
these records will be aborted eventually anyways.

> Streams EOS-Beta should not try to get end-offsets as read-committed
> --------------------------------------------------------------------
>
>                 Key: KAFKA-10167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10167
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Priority: Major
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to