Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Dong, I see what you mean. This would have been clearer if the committed offset was the offset of the last consumed record. I don't feel too strongly about it either. Perhaps we can use the more concise name and just rely on the documentation to explain its usage. It should be rare that users have think about this anyway. -Jason On Tue, Aug 7, 2018 at 1:35 PM, Dong Lin wrote: > Hey Jason, > > Thanks for the reply. Regarding 3), I am thinking that both "Offset" and > "LastLeaderEpoch" in the OffsetCommitRequest are associated with the last > consumed messages. Value of "Offset" is not necessarily the offset of the > next message due to log compaction. Since we are naming "Offset" as e.g. > "NextOffset", it may be simpler to use "LeaderEpoch". > > I am not strong on this. If we decide to name the new field as > "LastLeaderEpoch", would it be more consistent to also name the new field > as "LastLeaderEpoch" in the Offset Commit Value Schema? > > > Thanks, > Dong > > > On Tue, Aug 7, 2018 at 1:23 PM, Jason Gustafson > wrote: > > > Hi Dong, > > > > Thanks for the comments. > > > > 1) Yes, makes sense. > > > > 2) This is an interesting point. The suggestion made more sense in the > > initial version of the KIP, but I think you are right that we should use > > the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch > > APIs. Just like a following replica, we need to protect the initial fetch > > to a leader. If the initial fetch position was obtained from a broker > with > > the same epoch, then we can begin fetching. Otherwise, we need the > > OffsetForLeaderEpoch dance. And it is better to avoid using offsets > > obtained from stale leaders in the first place. I'll update the KIP. > > > > 3) The committed offset is actually the next offset that a consumer will > > read, right? The leader epoch on the other hand is the last one that was > > consumed. > > > > 4) Yes, that is right. And it is a good point that the epoch we use for > log > > reconciliation must be less than or equal to the current leader epoch. I > > will mention this. > > > > Thanks, > > Jason > > > > On Tue, Aug 7, 2018 at 12:43 PM, Jason Gustafson > > wrote: > > > > > Hey Jun, > > > > > > 57. It's a fair point. I could go either way, but I'm slightly inclined > > to > > > just document the new API for now. We'll still support seeking to an > > offset > > > with corresponding epoch information, so deprecating the old seek() > seems > > > like overkill. > > > > > > 60. The phrasing was a little confusing. Does this sound better? > > > > > > "Log truncation is detected if there exists a leader epoch which is > > > larger than this epoch and begins at an offset earlier than the > committed > > > offset." > > > > > > Thanks, > > > Jason > > > > > > > > > On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin wrote: > > > > > >> Hey Jason, > > >> > > >> Thanks for the update. I have some comments below: > > >> > > >> 1) Since FencedLeaderEpochException indicates that the metadata in the > > >> client is outdated, should it extend InvalidMetadataException? > > >> > > >> 2) It is mentioned that "To fix the problem with KIP-232, we will add > > the > > >> leader epoch the ListOffsets response. The consumer will use this in > its > > >> first fetch request after resetting offsets". If consumer sends > > >> ListOffsetRequest to the broker who is no longer the leader, and the > > >> broker > > >> still think it is the leader, then the broker may return > > >> ListOffsetResponse > > >> whose leaderEpoch is smaller than the leaderEpoch in the metadata of > the > > >> consumer. In this case consumer probably should not just send > > FetchRequest > > >> with the leaderEpoch of the ListOffsetResponse, right? I am wondering > > >> whether we should also include CurrentLeaderEpoch in the > > >> ListOffsetRequest. > > >> > > >> 3) Currently the new field added in the OffsetCommitRequest/ > > >> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that > > we > > >> are not naming the existing field "Offset" as "LastOffset", would it > be > > >> more consistent to just name the new field as LeaderEpoch? Same for > the > > >> new > > >> API in the class OffsetAndMetadata. > > >> > > >> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch > in > > >> the > > >> FetchRequest comes from? I suppose this value can be updated by the > > >> MetadataResponse, right? If so, maybe we should also clarify that > client > > >> should reject MetadataResponse if the leaderEpoch in the metadata > > response > > >> is smaller than what the client also knows from e.g. > > >> seek(...), OffsetFetchResponse? > > >> > > >> > > >> Thanks, > > >> Dong > > >> > > >> > > >> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao wrote: > > >> > > >> > Hi, Jason, > > >> > > > >> > Thanks for the reply. They all make sense. Just a couple of more > minor > > >> > comments. > > >> > > > >> > 57. I was thinking that if will be useful to encourage people to use > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for the reply. Regarding 3), I am thinking that both "Offset" and "LastLeaderEpoch" in the OffsetCommitRequest are associated with the last consumed messages. Value of "Offset" is not necessarily the offset of the next message due to log compaction. Since we are naming "Offset" as e.g. "NextOffset", it may be simpler to use "LeaderEpoch". I am not strong on this. If we decide to name the new field as "LastLeaderEpoch", would it be more consistent to also name the new field as "LastLeaderEpoch" in the Offset Commit Value Schema? Thanks, Dong On Tue, Aug 7, 2018 at 1:23 PM, Jason Gustafson wrote: > Hi Dong, > > Thanks for the comments. > > 1) Yes, makes sense. > > 2) This is an interesting point. The suggestion made more sense in the > initial version of the KIP, but I think you are right that we should use > the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch > APIs. Just like a following replica, we need to protect the initial fetch > to a leader. If the initial fetch position was obtained from a broker with > the same epoch, then we can begin fetching. Otherwise, we need the > OffsetForLeaderEpoch dance. And it is better to avoid using offsets > obtained from stale leaders in the first place. I'll update the KIP. > > 3) The committed offset is actually the next offset that a consumer will > read, right? The leader epoch on the other hand is the last one that was > consumed. > > 4) Yes, that is right. And it is a good point that the epoch we use for log > reconciliation must be less than or equal to the current leader epoch. I > will mention this. > > Thanks, > Jason > > On Tue, Aug 7, 2018 at 12:43 PM, Jason Gustafson > wrote: > > > Hey Jun, > > > > 57. It's a fair point. I could go either way, but I'm slightly inclined > to > > just document the new API for now. We'll still support seeking to an > offset > > with corresponding epoch information, so deprecating the old seek() seems > > like overkill. > > > > 60. The phrasing was a little confusing. Does this sound better? > > > > "Log truncation is detected if there exists a leader epoch which is > > larger than this epoch and begins at an offset earlier than the committed > > offset." > > > > Thanks, > > Jason > > > > > > On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin wrote: > > > >> Hey Jason, > >> > >> Thanks for the update. I have some comments below: > >> > >> 1) Since FencedLeaderEpochException indicates that the metadata in the > >> client is outdated, should it extend InvalidMetadataException? > >> > >> 2) It is mentioned that "To fix the problem with KIP-232, we will add > the > >> leader epoch the ListOffsets response. The consumer will use this in its > >> first fetch request after resetting offsets". If consumer sends > >> ListOffsetRequest to the broker who is no longer the leader, and the > >> broker > >> still think it is the leader, then the broker may return > >> ListOffsetResponse > >> whose leaderEpoch is smaller than the leaderEpoch in the metadata of the > >> consumer. In this case consumer probably should not just send > FetchRequest > >> with the leaderEpoch of the ListOffsetResponse, right? I am wondering > >> whether we should also include CurrentLeaderEpoch in the > >> ListOffsetRequest. > >> > >> 3) Currently the new field added in the OffsetCommitRequest/ > >> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that > we > >> are not naming the existing field "Offset" as "LastOffset", would it be > >> more consistent to just name the new field as LeaderEpoch? Same for the > >> new > >> API in the class OffsetAndMetadata. > >> > >> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in > >> the > >> FetchRequest comes from? I suppose this value can be updated by the > >> MetadataResponse, right? If so, maybe we should also clarify that client > >> should reject MetadataResponse if the leaderEpoch in the metadata > response > >> is smaller than what the client also knows from e.g. > >> seek(...), OffsetFetchResponse? > >> > >> > >> Thanks, > >> Dong > >> > >> > >> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao wrote: > >> > >> > Hi, Jason, > >> > > >> > Thanks for the reply. They all make sense. Just a couple of more minor > >> > comments. > >> > > >> > 57. I was thinking that if will be useful to encourage people to use > the > >> > new seek() api to get better semantics. Deprecating the old seek api > is > >> one > >> > way. I guess we could also just document it for now. > >> > > >> > 60. "Log truncation is detected if the first offset of the epoch for > the > >> > committed offset is larger than this epoch and begins at an earlier > >> > offset." It seems that we should add "that" before "is larger than"? > >> > > >> > Thanks, > >> > > >> > Jun > >> > > >> > > >> > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson > >> > wrote: > >> > > >> > > Hi Jun, > >> > > > >> > > I spent a little more time looking at the usage in WorkerSinkTask. I > >> > think > >> > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Dong, Thanks for the comments. 1) Yes, makes sense. 2) This is an interesting point. The suggestion made more sense in the initial version of the KIP, but I think you are right that we should use the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch APIs. Just like a following replica, we need to protect the initial fetch to a leader. If the initial fetch position was obtained from a broker with the same epoch, then we can begin fetching. Otherwise, we need the OffsetForLeaderEpoch dance. And it is better to avoid using offsets obtained from stale leaders in the first place. I'll update the KIP. 3) The committed offset is actually the next offset that a consumer will read, right? The leader epoch on the other hand is the last one that was consumed. 4) Yes, that is right. And it is a good point that the epoch we use for log reconciliation must be less than or equal to the current leader epoch. I will mention this. Thanks, Jason On Tue, Aug 7, 2018 at 12:43 PM, Jason Gustafson wrote: > Hey Jun, > > 57. It's a fair point. I could go either way, but I'm slightly inclined to > just document the new API for now. We'll still support seeking to an offset > with corresponding epoch information, so deprecating the old seek() seems > like overkill. > > 60. The phrasing was a little confusing. Does this sound better? > > "Log truncation is detected if there exists a leader epoch which is > larger than this epoch and begins at an offset earlier than the committed > offset." > > Thanks, > Jason > > > On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin wrote: > >> Hey Jason, >> >> Thanks for the update. I have some comments below: >> >> 1) Since FencedLeaderEpochException indicates that the metadata in the >> client is outdated, should it extend InvalidMetadataException? >> >> 2) It is mentioned that "To fix the problem with KIP-232, we will add the >> leader epoch the ListOffsets response. The consumer will use this in its >> first fetch request after resetting offsets". If consumer sends >> ListOffsetRequest to the broker who is no longer the leader, and the >> broker >> still think it is the leader, then the broker may return >> ListOffsetResponse >> whose leaderEpoch is smaller than the leaderEpoch in the metadata of the >> consumer. In this case consumer probably should not just send FetchRequest >> with the leaderEpoch of the ListOffsetResponse, right? I am wondering >> whether we should also include CurrentLeaderEpoch in the >> ListOffsetRequest. >> >> 3) Currently the new field added in the OffsetCommitRequest/ >> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we >> are not naming the existing field "Offset" as "LastOffset", would it be >> more consistent to just name the new field as LeaderEpoch? Same for the >> new >> API in the class OffsetAndMetadata. >> >> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in >> the >> FetchRequest comes from? I suppose this value can be updated by the >> MetadataResponse, right? If so, maybe we should also clarify that client >> should reject MetadataResponse if the leaderEpoch in the metadata response >> is smaller than what the client also knows from e.g. >> seek(...), OffsetFetchResponse? >> >> >> Thanks, >> Dong >> >> >> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao wrote: >> >> > Hi, Jason, >> > >> > Thanks for the reply. They all make sense. Just a couple of more minor >> > comments. >> > >> > 57. I was thinking that if will be useful to encourage people to use the >> > new seek() api to get better semantics. Deprecating the old seek api is >> one >> > way. I guess we could also just document it for now. >> > >> > 60. "Log truncation is detected if the first offset of the epoch for the >> > committed offset is larger than this epoch and begins at an earlier >> > offset." It seems that we should add "that" before "is larger than"? >> > >> > Thanks, >> > >> > Jun >> > >> > >> > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson >> > wrote: >> > >> > > Hi Jun, >> > > >> > > I spent a little more time looking at the usage in WorkerSinkTask. I >> > think >> > > actually the initialization of the positions in the assignment >> callback >> > is >> > > not strictly necessary. We keep a map of the current consumed offsets >> > which >> > > is updated as we consume the data. As far as I can tell, we could >> either >> > > skip the initialization and wait until the first fetched records come >> in >> > or >> > > we could use the committed() API to initialize positions. I think the >> > root >> > > of it is the argument Anna made previously. The leader epoch lets us >> > track >> > > the history of records that we have consumed. It is only useful when >> we >> > > want to tell whether records we have consumed were lost. So getting >> the >> > > leader epoch of an arbitrary position that was seeked doesn't really >> make >> > > sense. The dependence on the consumed records is most explicit if we >> only >> > > expose the
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jun, 57. It's a fair point. I could go either way, but I'm slightly inclined to just document the new API for now. We'll still support seeking to an offset with corresponding epoch information, so deprecating the old seek() seems like overkill. 60. The phrasing was a little confusing. Does this sound better? "Log truncation is detected if there exists a leader epoch which is larger than this epoch and begins at an offset earlier than the committed offset." Thanks, Jason On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin wrote: > Hey Jason, > > Thanks for the update. I have some comments below: > > 1) Since FencedLeaderEpochException indicates that the metadata in the > client is outdated, should it extend InvalidMetadataException? > > 2) It is mentioned that "To fix the problem with KIP-232, we will add the > leader epoch the ListOffsets response. The consumer will use this in its > first fetch request after resetting offsets". If consumer sends > ListOffsetRequest to the broker who is no longer the leader, and the broker > still think it is the leader, then the broker may return ListOffsetResponse > whose leaderEpoch is smaller than the leaderEpoch in the metadata of the > consumer. In this case consumer probably should not just send FetchRequest > with the leaderEpoch of the ListOffsetResponse, right? I am wondering > whether we should also include CurrentLeaderEpoch in the ListOffsetRequest. > > 3) Currently the new field added in the OffsetCommitRequest/ > OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we > are not naming the existing field "Offset" as "LastOffset", would it be > more consistent to just name the new field as LeaderEpoch? Same for the new > API in the class OffsetAndMetadata. > > 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in the > FetchRequest comes from? I suppose this value can be updated by the > MetadataResponse, right? If so, maybe we should also clarify that client > should reject MetadataResponse if the leaderEpoch in the metadata response > is smaller than what the client also knows from e.g. > seek(...), OffsetFetchResponse? > > > Thanks, > Dong > > > On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao wrote: > > > Hi, Jason, > > > > Thanks for the reply. They all make sense. Just a couple of more minor > > comments. > > > > 57. I was thinking that if will be useful to encourage people to use the > > new seek() api to get better semantics. Deprecating the old seek api is > one > > way. I guess we could also just document it for now. > > > > 60. "Log truncation is detected if the first offset of the epoch for the > > committed offset is larger than this epoch and begins at an earlier > > offset." It seems that we should add "that" before "is larger than"? > > > > Thanks, > > > > Jun > > > > > > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson > > wrote: > > > > > Hi Jun, > > > > > > I spent a little more time looking at the usage in WorkerSinkTask. I > > think > > > actually the initialization of the positions in the assignment callback > > is > > > not strictly necessary. We keep a map of the current consumed offsets > > which > > > is updated as we consume the data. As far as I can tell, we could > either > > > skip the initialization and wait until the first fetched records come > in > > or > > > we could use the committed() API to initialize positions. I think the > > root > > > of it is the argument Anna made previously. The leader epoch lets us > > track > > > the history of records that we have consumed. It is only useful when we > > > want to tell whether records we have consumed were lost. So getting the > > > leader epoch of an arbitrary position that was seeked doesn't really > make > > > sense. The dependence on the consumed records is most explicit if we > only > > > expose the leader epoch inside the fetched records. We might consider > > > adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm > > > inclined to leave that as potential future work. > > > > > > A couple additional notes: > > > > > > 1. I've renamed OffsetAndMetadata.leaderEpoch to > > > OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know > > what > > > the leader epoch of the committed offset should be, so this just > > clarifies > > > the expected usage. > > > > > > 2. I decided to add a helper to ConsumerRecords to get the next > offsets. > > We > > > would use this in WorkerSinkTask and external storage use cases to > > simplify > > > the commit logic. If we are consuming batch by batch, then we don't > need > > > the message-level bookkeeping. > > > > > > Thanks, > > > Jason > > > > > > On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson > > > wrote: > > > > > > > Hey Jun, > > > > > > > > Thanks for the review. Responses below: > > > > > > > > 50. Yes, that is right. I clarified this in the KIP. > > > > > > > > 51. Yes, updated the KIP to mention. > > > > > > > > 52. Yeah, this was a reference to a previous iteration. I've
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for the update. I have some comments below: 1) Since FencedLeaderEpochException indicates that the metadata in the client is outdated, should it extend InvalidMetadataException? 2) It is mentioned that "To fix the problem with KIP-232, we will add the leader epoch the ListOffsets response. The consumer will use this in its first fetch request after resetting offsets". If consumer sends ListOffsetRequest to the broker who is no longer the leader, and the broker still think it is the leader, then the broker may return ListOffsetResponse whose leaderEpoch is smaller than the leaderEpoch in the metadata of the consumer. In this case consumer probably should not just send FetchRequest with the leaderEpoch of the ListOffsetResponse, right? I am wondering whether we should also include CurrentLeaderEpoch in the ListOffsetRequest. 3) Currently the new field added in the OffsetCommitRequest/ OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we are not naming the existing field "Offset" as "LastOffset", would it be more consistent to just name the new field as LeaderEpoch? Same for the new API in the class OffsetAndMetadata. 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in the FetchRequest comes from? I suppose this value can be updated by the MetadataResponse, right? If so, maybe we should also clarify that client should reject MetadataResponse if the leaderEpoch in the metadata response is smaller than what the client also knows from e.g. seek(...), OffsetFetchResponse? Thanks, Dong On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao wrote: > Hi, Jason, > > Thanks for the reply. They all make sense. Just a couple of more minor > comments. > > 57. I was thinking that if will be useful to encourage people to use the > new seek() api to get better semantics. Deprecating the old seek api is one > way. I guess we could also just document it for now. > > 60. "Log truncation is detected if the first offset of the epoch for the > committed offset is larger than this epoch and begins at an earlier > offset." It seems that we should add "that" before "is larger than"? > > Thanks, > > Jun > > > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson > wrote: > > > Hi Jun, > > > > I spent a little more time looking at the usage in WorkerSinkTask. I > think > > actually the initialization of the positions in the assignment callback > is > > not strictly necessary. We keep a map of the current consumed offsets > which > > is updated as we consume the data. As far as I can tell, we could either > > skip the initialization and wait until the first fetched records come in > or > > we could use the committed() API to initialize positions. I think the > root > > of it is the argument Anna made previously. The leader epoch lets us > track > > the history of records that we have consumed. It is only useful when we > > want to tell whether records we have consumed were lost. So getting the > > leader epoch of an arbitrary position that was seeked doesn't really make > > sense. The dependence on the consumed records is most explicit if we only > > expose the leader epoch inside the fetched records. We might consider > > adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm > > inclined to leave that as potential future work. > > > > A couple additional notes: > > > > 1. I've renamed OffsetAndMetadata.leaderEpoch to > > OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know > what > > the leader epoch of the committed offset should be, so this just > clarifies > > the expected usage. > > > > 2. I decided to add a helper to ConsumerRecords to get the next offsets. > We > > would use this in WorkerSinkTask and external storage use cases to > simplify > > the commit logic. If we are consuming batch by batch, then we don't need > > the message-level bookkeeping. > > > > Thanks, > > Jason > > > > On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson > > wrote: > > > > > Hey Jun, > > > > > > Thanks for the review. Responses below: > > > > > > 50. Yes, that is right. I clarified this in the KIP. > > > > > > 51. Yes, updated the KIP to mention. > > > > > > 52. Yeah, this was a reference to a previous iteration. I've fixed it. > > > > > > 53. I changed the API to use an `Optional` for the leader > epoch > > > and added a note about the default value. Does that seem reasonable? > > > > > > 54. We discussed this above, but could not find a great option. The > > > options are to add a new API (e.g. positionAndEpoch) or to rely on the > > user > > > to get the epoch from the fetched records. We were leaning toward the > > > latter, but I admit it was not fully satisfying. In this case, Connect > > > would need to track the last consumed offsets manually instead of > relying > > > on the consumer. We also considered adding a convenience method to > > > ConsumerRecords to get the offset to commit for all fetched partitions. > > > This makes the additional bookkeeping
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi, Jason, Thanks for the reply. They all make sense. Just a couple of more minor comments. 57. I was thinking that if will be useful to encourage people to use the new seek() api to get better semantics. Deprecating the old seek api is one way. I guess we could also just document it for now. 60. "Log truncation is detected if the first offset of the epoch for the committed offset is larger than this epoch and begins at an earlier offset." It seems that we should add "that" before "is larger than"? Thanks, Jun On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson wrote: > Hi Jun, > > I spent a little more time looking at the usage in WorkerSinkTask. I think > actually the initialization of the positions in the assignment callback is > not strictly necessary. We keep a map of the current consumed offsets which > is updated as we consume the data. As far as I can tell, we could either > skip the initialization and wait until the first fetched records come in or > we could use the committed() API to initialize positions. I think the root > of it is the argument Anna made previously. The leader epoch lets us track > the history of records that we have consumed. It is only useful when we > want to tell whether records we have consumed were lost. So getting the > leader epoch of an arbitrary position that was seeked doesn't really make > sense. The dependence on the consumed records is most explicit if we only > expose the leader epoch inside the fetched records. We might consider > adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm > inclined to leave that as potential future work. > > A couple additional notes: > > 1. I've renamed OffsetAndMetadata.leaderEpoch to > OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know what > the leader epoch of the committed offset should be, so this just clarifies > the expected usage. > > 2. I decided to add a helper to ConsumerRecords to get the next offsets. We > would use this in WorkerSinkTask and external storage use cases to simplify > the commit logic. If we are consuming batch by batch, then we don't need > the message-level bookkeeping. > > Thanks, > Jason > > On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson > wrote: > > > Hey Jun, > > > > Thanks for the review. Responses below: > > > > 50. Yes, that is right. I clarified this in the KIP. > > > > 51. Yes, updated the KIP to mention. > > > > 52. Yeah, this was a reference to a previous iteration. I've fixed it. > > > > 53. I changed the API to use an `Optional` for the leader epoch > > and added a note about the default value. Does that seem reasonable? > > > > 54. We discussed this above, but could not find a great option. The > > options are to add a new API (e.g. positionAndEpoch) or to rely on the > user > > to get the epoch from the fetched records. We were leaning toward the > > latter, but I admit it was not fully satisfying. In this case, Connect > > would need to track the last consumed offsets manually instead of relying > > on the consumer. We also considered adding a convenience method to > > ConsumerRecords to get the offset to commit for all fetched partitions. > > This makes the additional bookkeeping pretty minimal. What do you think? > > > > 55. I clarified in the KIP. I was mainly thinking of situations where a > > previously valid offset becomes out of range. > > > > 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is > > and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch > > APIs. I think Dong suggested this previously as well. > > > > 57. We could, but I'm not sure there's a strong reason to do so. I was > > thinking we would leave it around for convenience, but let me know if you > > think we should do otherwise. > > > > > > Thanks, > > Jason > > > > > > On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao wrote: > > > >> Hi, Jason, > >> > >> Thanks for the updated KIP. Well thought-through. Just a few minor > >> comments > >> below. > >> > >> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I > guess > >> under the cover, it will make OffsetsForLeaderEpoch request to determine > >> if > >> the seeked offset is still valid before fetching? If so, it will be > useful > >> document this in the wiki. > >> > >> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I > >> guess the consumer will also make OffsetsForLeaderEpoch request to > >> determine if the last consumed offset is still valid before fetching? If > >> so, it will be useful document this in the wiki. > >> > >> 52. "If the consumer seeks to the middle of the log, for example, then > we > >> will use the sentinel value -1 and the leader will skip the epoch > >> validation. " Is this true? If the consumer seeks using > >> seek(TopicPartition > >> partition, OffsetAndMetadata offset) and the seeked offset is valid, the > >> consumer can/should use the leaderEpoch in the cached metadata for > >> fetching? > >> > >> 53. OffsetAndMetadata.
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Jun, I spent a little more time looking at the usage in WorkerSinkTask. I think actually the initialization of the positions in the assignment callback is not strictly necessary. We keep a map of the current consumed offsets which is updated as we consume the data. As far as I can tell, we could either skip the initialization and wait until the first fetched records come in or we could use the committed() API to initialize positions. I think the root of it is the argument Anna made previously. The leader epoch lets us track the history of records that we have consumed. It is only useful when we want to tell whether records we have consumed were lost. So getting the leader epoch of an arbitrary position that was seeked doesn't really make sense. The dependence on the consumed records is most explicit if we only expose the leader epoch inside the fetched records. We might consider adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm inclined to leave that as potential future work. A couple additional notes: 1. I've renamed OffsetAndMetadata.leaderEpoch to OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know what the leader epoch of the committed offset should be, so this just clarifies the expected usage. 2. I decided to add a helper to ConsumerRecords to get the next offsets. We would use this in WorkerSinkTask and external storage use cases to simplify the commit logic. If we are consuming batch by batch, then we don't need the message-level bookkeeping. Thanks, Jason On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson wrote: > Hey Jun, > > Thanks for the review. Responses below: > > 50. Yes, that is right. I clarified this in the KIP. > > 51. Yes, updated the KIP to mention. > > 52. Yeah, this was a reference to a previous iteration. I've fixed it. > > 53. I changed the API to use an `Optional` for the leader epoch > and added a note about the default value. Does that seem reasonable? > > 54. We discussed this above, but could not find a great option. The > options are to add a new API (e.g. positionAndEpoch) or to rely on the user > to get the epoch from the fetched records. We were leaning toward the > latter, but I admit it was not fully satisfying. In this case, Connect > would need to track the last consumed offsets manually instead of relying > on the consumer. We also considered adding a convenience method to > ConsumerRecords to get the offset to commit for all fetched partitions. > This makes the additional bookkeeping pretty minimal. What do you think? > > 55. I clarified in the KIP. I was mainly thinking of situations where a > previously valid offset becomes out of range. > > 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is > and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch > APIs. I think Dong suggested this previously as well. > > 57. We could, but I'm not sure there's a strong reason to do so. I was > thinking we would leave it around for convenience, but let me know if you > think we should do otherwise. > > > Thanks, > Jason > > > On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao wrote: > >> Hi, Jason, >> >> Thanks for the updated KIP. Well thought-through. Just a few minor >> comments >> below. >> >> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess >> under the cover, it will make OffsetsForLeaderEpoch request to determine >> if >> the seeked offset is still valid before fetching? If so, it will be useful >> document this in the wiki. >> >> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I >> guess the consumer will also make OffsetsForLeaderEpoch request to >> determine if the last consumed offset is still valid before fetching? If >> so, it will be useful document this in the wiki. >> >> 52. "If the consumer seeks to the middle of the log, for example, then we >> will use the sentinel value -1 and the leader will skip the epoch >> validation. " Is this true? If the consumer seeks using >> seek(TopicPartition >> partition, OffsetAndMetadata offset) and the seeked offset is valid, the >> consumer can/should use the leaderEpoch in the cached metadata for >> fetching? >> >> 53. OffsetAndMetadata. For backward compatibility, we need to support >> constructing OffsetAndMetadata without providing leaderEpoch. Could we >> define the default value of leaderEpoch if not provided and the semantics >> of that (e.g., skipping the epoch validation)? >> >> 54. I saw the following code in WorkerSinkTask in Connect. It saves the >> offset obtained through position(), which can be committed latter. Since >> position() doesn't return the leaderEpoch, this can lead to committed >> offset without leaderEpoch. Not sure how common this usage is, but what's >> the recommendation for such users? >> >> private class HandleRebalance implements ConsumerRebalanceListener { >> @Override >> public void onPartitionsAssigned(Collection >> partitions) { >> log.debug("{}
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jun, Thanks for the review. Responses below: 50. Yes, that is right. I clarified this in the KIP. 51. Yes, updated the KIP to mention. 52. Yeah, this was a reference to a previous iteration. I've fixed it. 53. I changed the API to use an `Optional` for the leader epoch and added a note about the default value. Does that seem reasonable? 54. We discussed this above, but could not find a great option. The options are to add a new API (e.g. positionAndEpoch) or to rely on the user to get the epoch from the fetched records. We were leaning toward the latter, but I admit it was not fully satisfying. In this case, Connect would need to track the last consumed offsets manually instead of relying on the consumer. We also considered adding a convenience method to ConsumerRecords to get the offset to commit for all fetched partitions. This makes the additional bookkeeping pretty minimal. What do you think? 55. I clarified in the KIP. I was mainly thinking of situations where a previously valid offset becomes out of range. 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch APIs. I think Dong suggested this previously as well. 57. We could, but I'm not sure there's a strong reason to do so. I was thinking we would leave it around for convenience, but let me know if you think we should do otherwise. Thanks, Jason On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao wrote: > Hi, Jason, > > Thanks for the updated KIP. Well thought-through. Just a few minor comments > below. > > 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess > under the cover, it will make OffsetsForLeaderEpoch request to determine if > the seeked offset is still valid before fetching? If so, it will be useful > document this in the wiki. > > 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I > guess the consumer will also make OffsetsForLeaderEpoch request to > determine if the last consumed offset is still valid before fetching? If > so, it will be useful document this in the wiki. > > 52. "If the consumer seeks to the middle of the log, for example, then we > will use the sentinel value -1 and the leader will skip the epoch > validation. " Is this true? If the consumer seeks using seek(TopicPartition > partition, OffsetAndMetadata offset) and the seeked offset is valid, the > consumer can/should use the leaderEpoch in the cached metadata for > fetching? > > 53. OffsetAndMetadata. For backward compatibility, we need to support > constructing OffsetAndMetadata without providing leaderEpoch. Could we > define the default value of leaderEpoch if not provided and the semantics > of that (e.g., skipping the epoch validation)? > > 54. I saw the following code in WorkerSinkTask in Connect. It saves the > offset obtained through position(), which can be committed latter. Since > position() doesn't return the leaderEpoch, this can lead to committed > offset without leaderEpoch. Not sure how common this usage is, but what's > the recommendation for such users? > > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection > partitions) { > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > lastCommittedOffsets = new HashMap<>(); > currentOffsets = new HashMap<>(); > for (TopicPartition tp : partitions) { > long pos = consumer.position(tp); > lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); > > 55. "With this KIP, the only case in which this is possible is if the > consumer fetches from an offset earlier than the log start offset." Is that > true? I guess a user could seek to a large offset without providing > leaderEpoch, which can cause the offset to be larger than the log end > offset during fetch? > > 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be > an existing field. Is LeaderEpochQuery the new field? The name is not very > intuitive. It will be useful to document its meaning. > > 57. Should we deprecate the following api? > void seek(TopicPartition partition, long offset); > > Thanks, > > Jun > > > On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson > wrote: > > > Hey All, > > > > I think I've addressed all pending review. If there is no additional > > feedback, I'll plan to start a vote thread next week. > > > > Thanks, > > Jason > > > > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin wrote: > > > > > Hey Jason, > > > > > > Thanks for your reply. I will comment below. > > > > > > Regarding 1, we probably can not simply rename both to `LeaderEpoch` > > > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch. > > > > > > Regarding 5, I am not strong on this. I agree with the two benefits of > > > having two error codes: 1) not having to refresh metadata when consumer > > > sees UNKNOWN_LEADER_EPOCH and 2) provide
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi, Jason, Thanks for the updated KIP. Well thought-through. Just a few minor comments below. 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess under the cover, it will make OffsetsForLeaderEpoch request to determine if the seeked offset is still valid before fetching? If so, it will be useful document this in the wiki. 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I guess the consumer will also make OffsetsForLeaderEpoch request to determine if the last consumed offset is still valid before fetching? If so, it will be useful document this in the wiki. 52. "If the consumer seeks to the middle of the log, for example, then we will use the sentinel value -1 and the leader will skip the epoch validation. " Is this true? If the consumer seeks using seek(TopicPartition partition, OffsetAndMetadata offset) and the seeked offset is valid, the consumer can/should use the leaderEpoch in the cached metadata for fetching? 53. OffsetAndMetadata. For backward compatibility, we need to support constructing OffsetAndMetadata without providing leaderEpoch. Could we define the default value of leaderEpoch if not provided and the semantics of that (e.g., skipping the epoch validation)? 54. I saw the following code in WorkerSinkTask in Connect. It saves the offset obtained through position(), which can be committed latter. Since position() doesn't return the leaderEpoch, this can lead to committed offset without leaderEpoch. Not sure how common this usage is, but what's the recommendation for such users? private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { long pos = consumer.position(tp); lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); 55. "With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset." Is that true? I guess a user could seek to a large offset without providing leaderEpoch, which can cause the offset to be larger than the log end offset during fetch? 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be an existing field. Is LeaderEpochQuery the new field? The name is not very intuitive. It will be useful to document its meaning. 57. Should we deprecate the following api? void seek(TopicPartition partition, long offset); Thanks, Jun On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson wrote: > Hey All, > > I think I've addressed all pending review. If there is no additional > feedback, I'll plan to start a vote thread next week. > > Thanks, > Jason > > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin wrote: > > > Hey Jason, > > > > Thanks for your reply. I will comment below. > > > > Regarding 1, we probably can not simply rename both to `LeaderEpoch` > > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch. > > > > Regarding 5, I am not strong on this. I agree with the two benefits of > > having two error codes: 1) not having to refresh metadata when consumer > > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log for > > debugging. Whether or not these two benefits are useful enough for one > more > > error code may be subjective. I will let you and others determine this. > > > > Regarding 6, yeah overloading seek() looks good to me. > > > > > > Thanks, > > Dong > > > > > > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson > > wrote: > > > > > Hey Dong, > > > > > > Thanks for the detailed review. Responses below: > > > > > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to > > > simply call it LeaderEpoch for both APIs? > > > > > > 3: I agree it should be a map. I will update. > > > > > > 4: Fair point. I think we should always be able to identify an offset. > > > Let's remove the Optional for now and reconsider if we find an > unhandled > > > case during implementation. > > > > > > 5: Yeah, I was thinking about this. The two error codes could be > handled > > > similarly, so we might merge them. Mainly I was thinking that it will > be > > > useful for consumers/replicas to know whether they are ahead or behind > > the > > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need > not > > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it > > > could just stop fetching and await the LeaderAndIsr request that it is > > > missing. It probably also makes debugging a little bit easier. I guess > > I'm > > > a bit inclined to keep both error codes, but I'm open to > reconsideration > > if > > > you feel strongly. Another point to consider is whether we should > > continue > > > using NOT_LEADER_FOR_PARTITION if a follower receives an
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey All, I think I've addressed all pending review. If there is no additional feedback, I'll plan to start a vote thread next week. Thanks, Jason On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin wrote: > Hey Jason, > > Thanks for your reply. I will comment below. > > Regarding 1, we probably can not simply rename both to `LeaderEpoch` > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch. > > Regarding 5, I am not strong on this. I agree with the two benefits of > having two error codes: 1) not having to refresh metadata when consumer > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log for > debugging. Whether or not these two benefits are useful enough for one more > error code may be subjective. I will let you and others determine this. > > Regarding 6, yeah overloading seek() looks good to me. > > > Thanks, > Dong > > > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson > wrote: > > > Hey Dong, > > > > Thanks for the detailed review. Responses below: > > > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to > > simply call it LeaderEpoch for both APIs? > > > > 3: I agree it should be a map. I will update. > > > > 4: Fair point. I think we should always be able to identify an offset. > > Let's remove the Optional for now and reconsider if we find an unhandled > > case during implementation. > > > > 5: Yeah, I was thinking about this. The two error codes could be handled > > similarly, so we might merge them. Mainly I was thinking that it will be > > useful for consumers/replicas to know whether they are ahead or behind > the > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it > > could just stop fetching and await the LeaderAndIsr request that it is > > missing. It probably also makes debugging a little bit easier. I guess > I'm > > a bit inclined to keep both error codes, but I'm open to reconsideration > if > > you feel strongly. Another point to consider is whether we should > continue > > using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected > fetch. > > The leader epoch would be different in this case so we could use one of > the > > invalid epoch error codes instead since they contain more information. > > > > 6: I agree the name is not ideal in that scenario. What if we overloaded > > `seek`? > > > > 7: Sure, I will mention this. > > > > > > Thanks, > > Jason > > > > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin wrote: > > > > > Hey Jason, > > > > > > Thanks for the update! I agree with the current proposal overall. I > have > > > some minor comments related to naming etc. > > > > > > 1) I am not strong and will just leave it here for discussion. Would it > > be > > > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the > > new > > > field in the OffsetsForLeaderEpochRequest? The reason is that > > > "CurrentLeaderEpoch" may not necessarily be true current leader epoch > if > > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this > > > epoch is what consumer expects on the broker which may or may not be > the > > > true value. > > > > > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the > field > > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both > > > fields are compared with the leaderEpoch in the broker, would it be > > better > > > to give them the same name? > > > > > > 3) Currently LogTruncationException.truncationOffset() returns > > > Optional to user. Should it return > > > Optional> to handle the scenario > > > where leaderEpoch of multiple partitions are different from the > > leaderEpoch > > > in the broker? > > > > > > 4) Currently LogTruncationException.truncationOffset() returns an > > Optional > > > value. Could you explain a bit more when it will return > > Optional.empty()? I > > > am trying to understand whether it is simpler and reasonable to > > > replace Optional.empty() > > > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1). > > > > > > 5) Do we also need to add a new retriable exception for error code > > > FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH > > > and UNKNOWN_LEADER_EPOCH. > > > It seems that the current KIP uses these two error codes in the same > way > > > and the exception for these two error codes is not exposed to the user. > > > Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH? > > > > > > 6) For users who has turned off auto offset reset, when consumer.poll() > > > throw LogTruncationException, it seems that user will most likely call > > > seekToCommitted(offset, > > > leaderEpoch) where offset and leaderEpoch are obtained from > > > LogTruncationException.truncationOffset(). In this case, the offset > used > > > here is not committed, which is inconsistent from the method name > > > seekToCommitted(...). Would it be better to rename the method to e.g.
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for your reply. I will comment below. Regarding 1, we probably can not simply rename both to `LeaderEpoch` because we already have a LeaderEpoch field in OffsetsForLeaderEpoch. Regarding 5, I am not strong on this. I agree with the two benefits of having two error codes: 1) not having to refresh metadata when consumer sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log for debugging. Whether or not these two benefits are useful enough for one more error code may be subjective. I will let you and others determine this. Regarding 6, yeah overloading seek() looks good to me. Thanks, Dong On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson wrote: > Hey Dong, > > Thanks for the detailed review. Responses below: > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to > simply call it LeaderEpoch for both APIs? > > 3: I agree it should be a map. I will update. > > 4: Fair point. I think we should always be able to identify an offset. > Let's remove the Optional for now and reconsider if we find an unhandled > case during implementation. > > 5: Yeah, I was thinking about this. The two error codes could be handled > similarly, so we might merge them. Mainly I was thinking that it will be > useful for consumers/replicas to know whether they are ahead or behind the > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it > could just stop fetching and await the LeaderAndIsr request that it is > missing. It probably also makes debugging a little bit easier. I guess I'm > a bit inclined to keep both error codes, but I'm open to reconsideration if > you feel strongly. Another point to consider is whether we should continue > using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected fetch. > The leader epoch would be different in this case so we could use one of the > invalid epoch error codes instead since they contain more information. > > 6: I agree the name is not ideal in that scenario. What if we overloaded > `seek`? > > 7: Sure, I will mention this. > > > Thanks, > Jason > > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin wrote: > > > Hey Jason, > > > > Thanks for the update! I agree with the current proposal overall. I have > > some minor comments related to naming etc. > > > > 1) I am not strong and will just leave it here for discussion. Would it > be > > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the > new > > field in the OffsetsForLeaderEpochRequest? The reason is that > > "CurrentLeaderEpoch" may not necessarily be true current leader epoch if > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this > > epoch is what consumer expects on the broker which may or may not be the > > true value. > > > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the field > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both > > fields are compared with the leaderEpoch in the broker, would it be > better > > to give them the same name? > > > > 3) Currently LogTruncationException.truncationOffset() returns > > Optional to user. Should it return > > Optional> to handle the scenario > > where leaderEpoch of multiple partitions are different from the > leaderEpoch > > in the broker? > > > > 4) Currently LogTruncationException.truncationOffset() returns an > Optional > > value. Could you explain a bit more when it will return > Optional.empty()? I > > am trying to understand whether it is simpler and reasonable to > > replace Optional.empty() > > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1). > > > > 5) Do we also need to add a new retriable exception for error code > > FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH > > and UNKNOWN_LEADER_EPOCH. > > It seems that the current KIP uses these two error codes in the same way > > and the exception for these two error codes is not exposed to the user. > > Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH? > > > > 6) For users who has turned off auto offset reset, when consumer.poll() > > throw LogTruncationException, it seems that user will most likely call > > seekToCommitted(offset, > > leaderEpoch) where offset and leaderEpoch are obtained from > > LogTruncationException.truncationOffset(). In this case, the offset used > > here is not committed, which is inconsistent from the method name > > seekToCommitted(...). Would it be better to rename the method to e.g. > > seekToLastConsumedMessage()? > > > > 7) Per point 3 in Jun's comment, would it be useful to explicitly specify > > in the KIP that we will log the truncation event if user has turned on > auto > > offset reset policy? > > > > > > Thanks, > > Dong > > > > > > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson > > wrote: > > > > > Thanks Anna, you are right on both points. I updated the KIP. > > > > > > -Jason > > > > > > On Thu,
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Dong, Thanks for the detailed review. Responses below: 1/2: Thanks for noticing the inconsistency. Would it be reasonable to simply call it LeaderEpoch for both APIs? 3: I agree it should be a map. I will update. 4: Fair point. I think we should always be able to identify an offset. Let's remove the Optional for now and reconsider if we find an unhandled case during implementation. 5: Yeah, I was thinking about this. The two error codes could be handled similarly, so we might merge them. Mainly I was thinking that it will be useful for consumers/replicas to know whether they are ahead or behind the leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it could just stop fetching and await the LeaderAndIsr request that it is missing. It probably also makes debugging a little bit easier. I guess I'm a bit inclined to keep both error codes, but I'm open to reconsideration if you feel strongly. Another point to consider is whether we should continue using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected fetch. The leader epoch would be different in this case so we could use one of the invalid epoch error codes instead since they contain more information. 6: I agree the name is not ideal in that scenario. What if we overloaded `seek`? 7: Sure, I will mention this. Thanks, Jason On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin wrote: > Hey Jason, > > Thanks for the update! I agree with the current proposal overall. I have > some minor comments related to naming etc. > > 1) I am not strong and will just leave it here for discussion. Would it be > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new > field in the OffsetsForLeaderEpochRequest? The reason is that > "CurrentLeaderEpoch" may not necessarily be true current leader epoch if > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this > epoch is what consumer expects on the broker which may or may not be the > true value. > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the field > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both > fields are compared with the leaderEpoch in the broker, would it be better > to give them the same name? > > 3) Currently LogTruncationException.truncationOffset() returns > Optional to user. Should it return > Optional> to handle the scenario > where leaderEpoch of multiple partitions are different from the leaderEpoch > in the broker? > > 4) Currently LogTruncationException.truncationOffset() returns an Optional > value. Could you explain a bit more when it will return Optional.empty()? I > am trying to understand whether it is simpler and reasonable to > replace Optional.empty() > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1). > > 5) Do we also need to add a new retriable exception for error code > FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH > and UNKNOWN_LEADER_EPOCH. > It seems that the current KIP uses these two error codes in the same way > and the exception for these two error codes is not exposed to the user. > Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH? > > 6) For users who has turned off auto offset reset, when consumer.poll() > throw LogTruncationException, it seems that user will most likely call > seekToCommitted(offset, > leaderEpoch) where offset and leaderEpoch are obtained from > LogTruncationException.truncationOffset(). In this case, the offset used > here is not committed, which is inconsistent from the method name > seekToCommitted(...). Would it be better to rename the method to e.g. > seekToLastConsumedMessage()? > > 7) Per point 3 in Jun's comment, would it be useful to explicitly specify > in the KIP that we will log the truncation event if user has turned on auto > offset reset policy? > > > Thanks, > Dong > > > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson > wrote: > > > Thanks Anna, you are right on both points. I updated the KIP. > > > > -Jason > > > > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner wrote: > > > > > Hi Jason, > > > > > > Thanks for the update. I agree with the current proposal. > > > > > > Two minor comments: > > > 1) In “API Changes” section, first paragraph says that “users can catch > > the > > > more specific exception type and use the new `seekToNearest()` API > > defined > > > below.”. Since LogTruncationException “will include the partitions that > > > were truncated and the offset of divergence”., shouldn’t the client use > > > seek(offset) to seek to the offset of divergence in response to the > > > exception? > > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says > > > “Note > > > that consumers will send a sentinel value (-1) for the current epoch > and > > > the broker will simply disregard that validation.”. Is that still true > > with > > > MetadataResponse containing leader epoch? > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for the update! I agree with the current proposal overall. I have some minor comments related to naming etc. 1) I am not strong and will just leave it here for discussion. Would it be better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new field in the OffsetsForLeaderEpochRequest? The reason is that "CurrentLeaderEpoch" may not necessarily be true current leader epoch if the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this epoch is what consumer expects on the broker which may or may not be the true value. 2) Currently we add the field "LeaderEpoch" to FetchRequest and the field "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both fields are compared with the leaderEpoch in the broker, would it be better to give them the same name? 3) Currently LogTruncationException.truncationOffset() returns Optional to user. Should it return Optional> to handle the scenario where leaderEpoch of multiple partitions are different from the leaderEpoch in the broker? 4) Currently LogTruncationException.truncationOffset() returns an Optional value. Could you explain a bit more when it will return Optional.empty()? I am trying to understand whether it is simpler and reasonable to replace Optional.empty() with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1). 5) Do we also need to add a new retriable exception for error code FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH and UNKNOWN_LEADER_EPOCH. It seems that the current KIP uses these two error codes in the same way and the exception for these two error codes is not exposed to the user. Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH? 6) For users who has turned off auto offset reset, when consumer.poll() throw LogTruncationException, it seems that user will most likely call seekToCommitted(offset, leaderEpoch) where offset and leaderEpoch are obtained from LogTruncationException.truncationOffset(). In this case, the offset used here is not committed, which is inconsistent from the method name seekToCommitted(...). Would it be better to rename the method to e.g. seekToLastConsumedMessage()? 7) Per point 3 in Jun's comment, would it be useful to explicitly specify in the KIP that we will log the truncation event if user has turned on auto offset reset policy? Thanks, Dong On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson wrote: > Thanks Anna, you are right on both points. I updated the KIP. > > -Jason > > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner wrote: > > > Hi Jason, > > > > Thanks for the update. I agree with the current proposal. > > > > Two minor comments: > > 1) In “API Changes” section, first paragraph says that “users can catch > the > > more specific exception type and use the new `seekToNearest()` API > defined > > below.”. Since LogTruncationException “will include the partitions that > > were truncated and the offset of divergence”., shouldn’t the client use > > seek(offset) to seek to the offset of divergence in response to the > > exception? > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says > > “Note > > that consumers will send a sentinel value (-1) for the current epoch and > > the broker will simply disregard that validation.”. Is that still true > with > > MetadataResponse containing leader epoch? > > > > Thanks, > > Anna > > > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson > > wrote: > > > > > Hi All, > > > > > > I have made some updates to the KIP. As many of you know, a side > project > > of > > > mine has been specifying the Kafka replication protocol in TLA. You can > > > check out the code here if you are interested: > > > https://github.com/hachikuji/kafka-specification. In addition to > > > uncovering > > > a couple unknown bugs in the replication protocol (e.g. > > > https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me > > > validate the behavior in this KIP. In fact, the original version I > > proposed > > > had a weakness. I initially suggested letting the leader validate the > > > expected epoch at the fetch offset. This made sense for the consumer in > > the > > > handling of unclean leader election, but it was not strong enough to > > > protect the follower in all cases. In order to make advancement of the > > high > > > watermark safe, for example, the leader actually needs to be sure that > > > every follower in the ISR matches its own epoch. > > > > > > I attempted to fix this problem by treating the epoch in the fetch > > request > > > slightly differently for consumers and followers. For consumers, it > would > > > be the expected epoch of the record at the fetch offset, and the leader > > > would raise a LOG_TRUNCATION error if the expectation failed. For > > > followers, it would be the current epoch and the leader would require > > that > > > it match its own epoch. This was unsatisfying both because of the > > > inconsistency in behavior and
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Thanks Anna, you are right on both points. I updated the KIP. -Jason On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner wrote: > Hi Jason, > > Thanks for the update. I agree with the current proposal. > > Two minor comments: > 1) In “API Changes” section, first paragraph says that “users can catch the > more specific exception type and use the new `seekToNearest()` API defined > below.”. Since LogTruncationException “will include the partitions that > were truncated and the offset of divergence”., shouldn’t the client use > seek(offset) to seek to the offset of divergence in response to the > exception? > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says > “Note > that consumers will send a sentinel value (-1) for the current epoch and > the broker will simply disregard that validation.”. Is that still true with > MetadataResponse containing leader epoch? > > Thanks, > Anna > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson > wrote: > > > Hi All, > > > > I have made some updates to the KIP. As many of you know, a side project > of > > mine has been specifying the Kafka replication protocol in TLA. You can > > check out the code here if you are interested: > > https://github.com/hachikuji/kafka-specification. In addition to > > uncovering > > a couple unknown bugs in the replication protocol (e.g. > > https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me > > validate the behavior in this KIP. In fact, the original version I > proposed > > had a weakness. I initially suggested letting the leader validate the > > expected epoch at the fetch offset. This made sense for the consumer in > the > > handling of unclean leader election, but it was not strong enough to > > protect the follower in all cases. In order to make advancement of the > high > > watermark safe, for example, the leader actually needs to be sure that > > every follower in the ISR matches its own epoch. > > > > I attempted to fix this problem by treating the epoch in the fetch > request > > slightly differently for consumers and followers. For consumers, it would > > be the expected epoch of the record at the fetch offset, and the leader > > would raise a LOG_TRUNCATION error if the expectation failed. For > > followers, it would be the current epoch and the leader would require > that > > it match its own epoch. This was unsatisfying both because of the > > inconsistency in behavior and because the consumer was left with the > weaker > > fencing that we already knew was insufficient for the replicas. > Ultimately > > I decided that we should make the behavior consistent and that meant that > > the consumer needed to act more like a following replica. Instead of > > checking for truncation while fetching, the consumer should check for > > truncation after leader changes. After checking for truncation, the > > consumer can then use the current epoch when fetching and get the > stronger > > protection that it provides. What this means is that the Metadata API > must > > include the current leader epoch. Given the problems we have had around > > stale metadata and how challenging they have been to debug, I'm convinced > > that this is a good idea in any case and it resolves the inconsistent > > behavior in the Fetch API. The downside is that there will be some > > additional overhead upon leader changes, but I don't think it is a major > > concern since leader changes are rare and the OffsetForLeaderEpoch > request > > is cheap. > > > > This approach leaves the door open for some interesting follow up > > improvements. For example, now that we have the leader epoch in the > > Metadata request, we can implement similar fencing for the Produce API. > And > > now that the consumer can reason about truncation, we could consider > having > > a configuration to expose records beyond the high watermark. This would > let > > users trade lower end-to-end latency for weaker durability semantics. It > is > > sort of like having an acks=0 option for the consumer. Neither of these > > options are included in this KIP, I am just mentioning them as potential > > work for the future. > > > > Finally, based on the discussion in this thread, I have added the > > seekToCommitted API for the consumer. Please take a look and let me know > > what you think. > > > > Thanks, > > Jason > > > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang > wrote: > > > > > Hi Jason, > > > > > > The proposed API seems reasonable to me too. Could you please also > update > > > the wiki page ( > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) > > > with a section say "workflow" on how the proposed API will be co-used > > with > > > others to: > > > > > > 1. consumer callers handling a LogTruncationException. > > > 2. consumer internals for handling a retriable > > UnknownLeaderEpochException. > > > > > > > > > Guozhang > > > > > > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Jason, Thanks for the update. I agree with the current proposal. Two minor comments: 1) In “API Changes” section, first paragraph says that “users can catch the more specific exception type and use the new `seekToNearest()` API defined below.”. Since LogTruncationException “will include the partitions that were truncated and the offset of divergence”., shouldn’t the client use seek(offset) to seek to the offset of divergence in response to the exception? 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says “Note that consumers will send a sentinel value (-1) for the current epoch and the broker will simply disregard that validation.”. Is that still true with MetadataResponse containing leader epoch? Thanks, Anna On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson wrote: > Hi All, > > I have made some updates to the KIP. As many of you know, a side project of > mine has been specifying the Kafka replication protocol in TLA. You can > check out the code here if you are interested: > https://github.com/hachikuji/kafka-specification. In addition to > uncovering > a couple unknown bugs in the replication protocol (e.g. > https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me > validate the behavior in this KIP. In fact, the original version I proposed > had a weakness. I initially suggested letting the leader validate the > expected epoch at the fetch offset. This made sense for the consumer in the > handling of unclean leader election, but it was not strong enough to > protect the follower in all cases. In order to make advancement of the high > watermark safe, for example, the leader actually needs to be sure that > every follower in the ISR matches its own epoch. > > I attempted to fix this problem by treating the epoch in the fetch request > slightly differently for consumers and followers. For consumers, it would > be the expected epoch of the record at the fetch offset, and the leader > would raise a LOG_TRUNCATION error if the expectation failed. For > followers, it would be the current epoch and the leader would require that > it match its own epoch. This was unsatisfying both because of the > inconsistency in behavior and because the consumer was left with the weaker > fencing that we already knew was insufficient for the replicas. Ultimately > I decided that we should make the behavior consistent and that meant that > the consumer needed to act more like a following replica. Instead of > checking for truncation while fetching, the consumer should check for > truncation after leader changes. After checking for truncation, the > consumer can then use the current epoch when fetching and get the stronger > protection that it provides. What this means is that the Metadata API must > include the current leader epoch. Given the problems we have had around > stale metadata and how challenging they have been to debug, I'm convinced > that this is a good idea in any case and it resolves the inconsistent > behavior in the Fetch API. The downside is that there will be some > additional overhead upon leader changes, but I don't think it is a major > concern since leader changes are rare and the OffsetForLeaderEpoch request > is cheap. > > This approach leaves the door open for some interesting follow up > improvements. For example, now that we have the leader epoch in the > Metadata request, we can implement similar fencing for the Produce API. And > now that the consumer can reason about truncation, we could consider having > a configuration to expose records beyond the high watermark. This would let > users trade lower end-to-end latency for weaker durability semantics. It is > sort of like having an acks=0 option for the consumer. Neither of these > options are included in this KIP, I am just mentioning them as potential > work for the future. > > Finally, based on the discussion in this thread, I have added the > seekToCommitted API for the consumer. Please take a look and let me know > what you think. > > Thanks, > Jason > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang wrote: > > > Hi Jason, > > > > The proposed API seems reasonable to me too. Could you please also update > > the wiki page ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) > > with a section say "workflow" on how the proposed API will be co-used > with > > others to: > > > > 1. consumer callers handling a LogTruncationException. > > 2. consumer internals for handling a retriable > UnknownLeaderEpochException. > > > > > > Guozhang > > > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner > wrote: > > > > > Hi Jason, > > > > > > > > > I also like your proposal and agree that > KafkaConsumer#seekToCommitted() > > > is > > > more intuitive as a way to initialize both consumer's position and its > > > fetch state. > > > > > > > > > My understanding that KafkaConsumer#seekToCommitted() is purely for > > > clients > > > who store
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi All, I have made some updates to the KIP. As many of you know, a side project of mine has been specifying the Kafka replication protocol in TLA. You can check out the code here if you are interested: https://github.com/hachikuji/kafka-specification. In addition to uncovering a couple unknown bugs in the replication protocol (e.g. https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me validate the behavior in this KIP. In fact, the original version I proposed had a weakness. I initially suggested letting the leader validate the expected epoch at the fetch offset. This made sense for the consumer in the handling of unclean leader election, but it was not strong enough to protect the follower in all cases. In order to make advancement of the high watermark safe, for example, the leader actually needs to be sure that every follower in the ISR matches its own epoch. I attempted to fix this problem by treating the epoch in the fetch request slightly differently for consumers and followers. For consumers, it would be the expected epoch of the record at the fetch offset, and the leader would raise a LOG_TRUNCATION error if the expectation failed. For followers, it would be the current epoch and the leader would require that it match its own epoch. This was unsatisfying both because of the inconsistency in behavior and because the consumer was left with the weaker fencing that we already knew was insufficient for the replicas. Ultimately I decided that we should make the behavior consistent and that meant that the consumer needed to act more like a following replica. Instead of checking for truncation while fetching, the consumer should check for truncation after leader changes. After checking for truncation, the consumer can then use the current epoch when fetching and get the stronger protection that it provides. What this means is that the Metadata API must include the current leader epoch. Given the problems we have had around stale metadata and how challenging they have been to debug, I'm convinced that this is a good idea in any case and it resolves the inconsistent behavior in the Fetch API. The downside is that there will be some additional overhead upon leader changes, but I don't think it is a major concern since leader changes are rare and the OffsetForLeaderEpoch request is cheap. This approach leaves the door open for some interesting follow up improvements. For example, now that we have the leader epoch in the Metadata request, we can implement similar fencing for the Produce API. And now that the consumer can reason about truncation, we could consider having a configuration to expose records beyond the high watermark. This would let users trade lower end-to-end latency for weaker durability semantics. It is sort of like having an acks=0 option for the consumer. Neither of these options are included in this KIP, I am just mentioning them as potential work for the future. Finally, based on the discussion in this thread, I have added the seekToCommitted API for the consumer. Please take a look and let me know what you think. Thanks, Jason On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang wrote: > Hi Jason, > > The proposed API seems reasonable to me too. Could you please also update > the wiki page ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) > with a section say "workflow" on how the proposed API will be co-used with > others to: > > 1. consumer callers handling a LogTruncationException. > 2. consumer internals for handling a retriable UnknownLeaderEpochException. > > > Guozhang > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner wrote: > > > Hi Jason, > > > > > > I also like your proposal and agree that KafkaConsumer#seekToCommitted() > > is > > more intuitive as a way to initialize both consumer's position and its > > fetch state. > > > > > > My understanding that KafkaConsumer#seekToCommitted() is purely for > > clients > > who store their offsets externally, right? And we are still going to > > add KafkaConsumer#findOffsets() > > in this KIP as we discussed, so that the client can handle > > LogTruncationException? > > > > > > Thanks, > > > > Anna > > > > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin wrote: > > > > > Hey Jason, > > > > > > It is a great summary. The solution sounds good. I might have minor > > > comments regarding the method name. But we can discuss that minor > points > > > later after we reach consensus on the high level API. > > > > > > Thanks, > > > Dong > > > > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson > > > wrote: > > > > > > > Hey Anna and Dong, > > > > > > > > Thanks a lot for the great discussion. I've been hanging back a bit > > > because > > > > honestly the best option hasn't seemed clear. I agree with Anna's > > general > > > > observation that there is a distinction between the position of the > > > > consumer and its fetch state up to that position.
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Jason, The proposed API seems reasonable to me too. Could you please also update the wiki page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) with a section say "workflow" on how the proposed API will be co-used with others to: 1. consumer callers handling a LogTruncationException. 2. consumer internals for handling a retriable UnknownLeaderEpochException. Guozhang On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner wrote: > Hi Jason, > > > I also like your proposal and agree that KafkaConsumer#seekToCommitted() > is > more intuitive as a way to initialize both consumer's position and its > fetch state. > > > My understanding that KafkaConsumer#seekToCommitted() is purely for > clients > who store their offsets externally, right? And we are still going to > add KafkaConsumer#findOffsets() > in this KIP as we discussed, so that the client can handle > LogTruncationException? > > > Thanks, > > Anna > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin wrote: > > > Hey Jason, > > > > It is a great summary. The solution sounds good. I might have minor > > comments regarding the method name. But we can discuss that minor points > > later after we reach consensus on the high level API. > > > > Thanks, > > Dong > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson > > wrote: > > > > > Hey Anna and Dong, > > > > > > Thanks a lot for the great discussion. I've been hanging back a bit > > because > > > honestly the best option hasn't seemed clear. I agree with Anna's > general > > > observation that there is a distinction between the position of the > > > consumer and its fetch state up to that position. If you think about > it, > > a > > > committed offset actually represents both of these. The metadata is > used > > to > > > initialize the state of the consumer application and the offset > > initializes > > > the position. Additionally, we are extending the offset commit in this > > KIP > > > to also include the last epoch fetched by the consumer, which is used > to > > > initialize the internal fetch state. Of course if you do an arbitrary > > > `seek` and immediately commit offsets, then there won't be a last epoch > > to > > > commit. This seems intuitive since there is no fetch state in this > case. > > We > > > only commit fetch state when we have it. > > > > > > So if we think about a committed offset as initializing both the > > consumer's > > > position and its fetch state, then the gap in the API is evidently that > > we > > > don't have a way to initialize the consumer to a committed offset. We > do > > it > > > implicitly of course for offsets stored in Kafka, but since external > > > storage is a use case we support, then we should have an explicit API > as > > > well. Perhaps something like this: > > > > > > seekToCommitted(TopicPartition, OffsetAndMetadata) > > > > > > In this KIP, we are proposing to allow the `OffsetAndMetadata` object > to > > > include the leader epoch, so I think this would have the same effect as > > > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit > > given > > > the current API? Furthermore, if we find a need for additional metadata > > in > > > the offset commit API in the future, then we will just need to modify > the > > > `OffsetAndMetadata` object and we will not need a new `seek` API. > > > > > > With this approach, I think then we can leave the `position` API as it > > is. > > > The position of the consumer is still just the next expected fetch > > offset. > > > If a user needs to record additional state based on previous fetch > > > progress, then they would use the result of the previous fetch to > obtain > > > it. This makes the dependence on fetch progress explicit. I think we > > could > > > make this a little more convenience with a helper in the > > `ConsumerRecords` > > > object, but I think that's more of a nice-to-have. > > > > > > Thoughts? > > > > > > By the way, I have been iterating a little bit on the replica side of > > this > > > KIP. My initial proposal in fact did not have strong enough fencing to > > > protect all of the edge cases. I believe the current proposal fixes the > > > problems, but I am still verifying the model. > > > > > > Thanks, > > > Jason > > > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin > wrote: > > > > > > > Hey Anna, > > > > > > > > Thanks much for the explanation. Approach 1 also sounds good to me. I > > > think > > > > findOffsets() is useful for users who don't use automatic offset > reset > > > > policy. > > > > > > > > Just one more question. Since users who store offsets externally need > > to > > > > provide leaderEpoch to findOffsets(...), do we need an extra API for > > user > > > > to get both offset and leaderEpoch, e.g. recordPosition()? > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner > > > wrote: > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Jason, I also like your proposal and agree that KafkaConsumer#seekToCommitted() is more intuitive as a way to initialize both consumer's position and its fetch state. My understanding that KafkaConsumer#seekToCommitted() is purely for clients who store their offsets externally, right? And we are still going to add KafkaConsumer#findOffsets() in this KIP as we discussed, so that the client can handle LogTruncationException? Thanks, Anna On Thu, Jul 12, 2018 at 3:57 PM Dong Lin wrote: > Hey Jason, > > It is a great summary. The solution sounds good. I might have minor > comments regarding the method name. But we can discuss that minor points > later after we reach consensus on the high level API. > > Thanks, > Dong > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson > wrote: > > > Hey Anna and Dong, > > > > Thanks a lot for the great discussion. I've been hanging back a bit > because > > honestly the best option hasn't seemed clear. I agree with Anna's general > > observation that there is a distinction between the position of the > > consumer and its fetch state up to that position. If you think about it, > a > > committed offset actually represents both of these. The metadata is used > to > > initialize the state of the consumer application and the offset > initializes > > the position. Additionally, we are extending the offset commit in this > KIP > > to also include the last epoch fetched by the consumer, which is used to > > initialize the internal fetch state. Of course if you do an arbitrary > > `seek` and immediately commit offsets, then there won't be a last epoch > to > > commit. This seems intuitive since there is no fetch state in this case. > We > > only commit fetch state when we have it. > > > > So if we think about a committed offset as initializing both the > consumer's > > position and its fetch state, then the gap in the API is evidently that > we > > don't have a way to initialize the consumer to a committed offset. We do > it > > implicitly of course for offsets stored in Kafka, but since external > > storage is a use case we support, then we should have an explicit API as > > well. Perhaps something like this: > > > > seekToCommitted(TopicPartition, OffsetAndMetadata) > > > > In this KIP, we are proposing to allow the `OffsetAndMetadata` object to > > include the leader epoch, so I think this would have the same effect as > > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit > given > > the current API? Furthermore, if we find a need for additional metadata > in > > the offset commit API in the future, then we will just need to modify the > > `OffsetAndMetadata` object and we will not need a new `seek` API. > > > > With this approach, I think then we can leave the `position` API as it > is. > > The position of the consumer is still just the next expected fetch > offset. > > If a user needs to record additional state based on previous fetch > > progress, then they would use the result of the previous fetch to obtain > > it. This makes the dependence on fetch progress explicit. I think we > could > > make this a little more convenience with a helper in the > `ConsumerRecords` > > object, but I think that's more of a nice-to-have. > > > > Thoughts? > > > > By the way, I have been iterating a little bit on the replica side of > this > > KIP. My initial proposal in fact did not have strong enough fencing to > > protect all of the edge cases. I believe the current proposal fixes the > > problems, but I am still verifying the model. > > > > Thanks, > > Jason > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin wrote: > > > > > Hey Anna, > > > > > > Thanks much for the explanation. Approach 1 also sounds good to me. I > > think > > > findOffsets() is useful for users who don't use automatic offset reset > > > policy. > > > > > > Just one more question. Since users who store offsets externally need > to > > > provide leaderEpoch to findOffsets(...), do we need an extra API for > user > > > to get both offset and leaderEpoch, e.g. recordPosition()? > > > > > > Thanks, > > > Dong > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner > > wrote: > > > > > > > Hi Dong, > > > > > > > > > > > > What I called “not covering all use cases” is what you call > best-effort > > > > (not guaranteeing some corner cases). I think we are on the same page > > > here. > > > > > > > > > > > > I wanted to be clear in the API whether the consumer seeks to a > > position > > > > (offset) or to a record (offset, leader epoch). The only use-case of > > > > seeking to a record is seeking to a committed offset for a user who > > > stores > > > > committed offsets externally. (Unless users find some other reason to > > > seek > > > > to a record.) I thought it was possible to provide this functionality > > > with > > > > findOffset(offset, leader epoch) followed by a seek(offset). However, > > you > > > > are right that this will not handle the race condition where > > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, It is a great summary. The solution sounds good. I might have minor comments regarding the method name. But we can discuss that minor points later after we reach consensus on the high level API. Thanks, Dong On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson wrote: > Hey Anna and Dong, > > Thanks a lot for the great discussion. I've been hanging back a bit because > honestly the best option hasn't seemed clear. I agree with Anna's general > observation that there is a distinction between the position of the > consumer and its fetch state up to that position. If you think about it, a > committed offset actually represents both of these. The metadata is used to > initialize the state of the consumer application and the offset initializes > the position. Additionally, we are extending the offset commit in this KIP > to also include the last epoch fetched by the consumer, which is used to > initialize the internal fetch state. Of course if you do an arbitrary > `seek` and immediately commit offsets, then there won't be a last epoch to > commit. This seems intuitive since there is no fetch state in this case. We > only commit fetch state when we have it. > > So if we think about a committed offset as initializing both the consumer's > position and its fetch state, then the gap in the API is evidently that we > don't have a way to initialize the consumer to a committed offset. We do it > implicitly of course for offsets stored in Kafka, but since external > storage is a use case we support, then we should have an explicit API as > well. Perhaps something like this: > > seekToCommitted(TopicPartition, OffsetAndMetadata) > > In this KIP, we are proposing to allow the `OffsetAndMetadata` object to > include the leader epoch, so I think this would have the same effect as > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given > the current API? Furthermore, if we find a need for additional metadata in > the offset commit API in the future, then we will just need to modify the > `OffsetAndMetadata` object and we will not need a new `seek` API. > > With this approach, I think then we can leave the `position` API as it is. > The position of the consumer is still just the next expected fetch offset. > If a user needs to record additional state based on previous fetch > progress, then they would use the result of the previous fetch to obtain > it. This makes the dependence on fetch progress explicit. I think we could > make this a little more convenience with a helper in the `ConsumerRecords` > object, but I think that's more of a nice-to-have. > > Thoughts? > > By the way, I have been iterating a little bit on the replica side of this > KIP. My initial proposal in fact did not have strong enough fencing to > protect all of the edge cases. I believe the current proposal fixes the > problems, but I am still verifying the model. > > Thanks, > Jason > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin wrote: > > > Hey Anna, > > > > Thanks much for the explanation. Approach 1 also sounds good to me. I > think > > findOffsets() is useful for users who don't use automatic offset reset > > policy. > > > > Just one more question. Since users who store offsets externally need to > > provide leaderEpoch to findOffsets(...), do we need an extra API for user > > to get both offset and leaderEpoch, e.g. recordPosition()? > > > > Thanks, > > Dong > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner > wrote: > > > > > Hi Dong, > > > > > > > > > What I called “not covering all use cases” is what you call best-effort > > > (not guaranteeing some corner cases). I think we are on the same page > > here. > > > > > > > > > I wanted to be clear in the API whether the consumer seeks to a > position > > > (offset) or to a record (offset, leader epoch). The only use-case of > > > seeking to a record is seeking to a committed offset for a user who > > stores > > > committed offsets externally. (Unless users find some other reason to > > seek > > > to a record.) I thought it was possible to provide this functionality > > with > > > findOffset(offset, leader epoch) followed by a seek(offset). However, > you > > > are right that this will not handle the race condition where > > non-divergent > > > offset found by findOffset() could change again before the consumer > does > > > the first fetch. > > > > > > > > > Regarding position() — if we add position that returns (offset, leader > > > epoch), this is specifically a position after a record that was > actually > > > consumed or position of a committed record. In which case, I still > think > > > it’s cleaner to get a record position of consumed message from a new > > helper > > > method in ConsumerRecords() or from committed offsets. > > > > > > > > > I think all the use-cases could be then covered with: > > > > > > (Approach 1) > > > > > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the > > > consumer state; > > > > > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Anna and Dong, Thanks a lot for the great discussion. I've been hanging back a bit because honestly the best option hasn't seemed clear. I agree with Anna's general observation that there is a distinction between the position of the consumer and its fetch state up to that position. If you think about it, a committed offset actually represents both of these. The metadata is used to initialize the state of the consumer application and the offset initializes the position. Additionally, we are extending the offset commit in this KIP to also include the last epoch fetched by the consumer, which is used to initialize the internal fetch state. Of course if you do an arbitrary `seek` and immediately commit offsets, then there won't be a last epoch to commit. This seems intuitive since there is no fetch state in this case. We only commit fetch state when we have it. So if we think about a committed offset as initializing both the consumer's position and its fetch state, then the gap in the API is evidently that we don't have a way to initialize the consumer to a committed offset. We do it implicitly of course for offsets stored in Kafka, but since external storage is a use case we support, then we should have an explicit API as well. Perhaps something like this: seekToCommitted(TopicPartition, OffsetAndMetadata) In this KIP, we are proposing to allow the `OffsetAndMetadata` object to include the leader epoch, so I think this would have the same effect as Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given the current API? Furthermore, if we find a need for additional metadata in the offset commit API in the future, then we will just need to modify the `OffsetAndMetadata` object and we will not need a new `seek` API. With this approach, I think then we can leave the `position` API as it is. The position of the consumer is still just the next expected fetch offset. If a user needs to record additional state based on previous fetch progress, then they would use the result of the previous fetch to obtain it. This makes the dependence on fetch progress explicit. I think we could make this a little more convenience with a helper in the `ConsumerRecords` object, but I think that's more of a nice-to-have. Thoughts? By the way, I have been iterating a little bit on the replica side of this KIP. My initial proposal in fact did not have strong enough fencing to protect all of the edge cases. I believe the current proposal fixes the problems, but I am still verifying the model. Thanks, Jason On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin wrote: > Hey Anna, > > Thanks much for the explanation. Approach 1 also sounds good to me. I think > findOffsets() is useful for users who don't use automatic offset reset > policy. > > Just one more question. Since users who store offsets externally need to > provide leaderEpoch to findOffsets(...), do we need an extra API for user > to get both offset and leaderEpoch, e.g. recordPosition()? > > Thanks, > Dong > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner wrote: > > > Hi Dong, > > > > > > What I called “not covering all use cases” is what you call best-effort > > (not guaranteeing some corner cases). I think we are on the same page > here. > > > > > > I wanted to be clear in the API whether the consumer seeks to a position > > (offset) or to a record (offset, leader epoch). The only use-case of > > seeking to a record is seeking to a committed offset for a user who > stores > > committed offsets externally. (Unless users find some other reason to > seek > > to a record.) I thought it was possible to provide this functionality > with > > findOffset(offset, leader epoch) followed by a seek(offset). However, you > > are right that this will not handle the race condition where > non-divergent > > offset found by findOffset() could change again before the consumer does > > the first fetch. > > > > > > Regarding position() — if we add position that returns (offset, leader > > epoch), this is specifically a position after a record that was actually > > consumed or position of a committed record. In which case, I still think > > it’s cleaner to get a record position of consumed message from a new > helper > > method in ConsumerRecords() or from committed offsets. > > > > > > I think all the use-cases could be then covered with: > > > > (Approach 1) > > > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the > > consumer state; > > > > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch} > > > > > > If we agree that the race condition is also a corner case, then I think > we > > can cover use-cases with: > > > > (Approach 2) > > > > findOffsets(offset, leaderEpoch) returns offset — we still want leader > > epoch as a parameter for the users who store their committed offsets > > externally. > > > > > > I am actually now leaning more to approach 1, since it is more explicit, > > and maybe there are more use cases for it. > > > > > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Anna, Thanks much for the explanation. Approach 1 also sounds good to me. I think findOffsets() is useful for users who don't use automatic offset reset policy. Just one more question. Since users who store offsets externally need to provide leaderEpoch to findOffsets(...), do we need an extra API for user to get both offset and leaderEpoch, e.g. recordPosition()? Thanks, Dong On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner wrote: > Hi Dong, > > > What I called “not covering all use cases” is what you call best-effort > (not guaranteeing some corner cases). I think we are on the same page here. > > > I wanted to be clear in the API whether the consumer seeks to a position > (offset) or to a record (offset, leader epoch). The only use-case of > seeking to a record is seeking to a committed offset for a user who stores > committed offsets externally. (Unless users find some other reason to seek > to a record.) I thought it was possible to provide this functionality with > findOffset(offset, leader epoch) followed by a seek(offset). However, you > are right that this will not handle the race condition where non-divergent > offset found by findOffset() could change again before the consumer does > the first fetch. > > > Regarding position() — if we add position that returns (offset, leader > epoch), this is specifically a position after a record that was actually > consumed or position of a committed record. In which case, I still think > it’s cleaner to get a record position of consumed message from a new helper > method in ConsumerRecords() or from committed offsets. > > > I think all the use-cases could be then covered with: > > (Approach 1) > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the > consumer state; > > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch} > > > If we agree that the race condition is also a corner case, then I think we > can cover use-cases with: > > (Approach 2) > > findOffsets(offset, leaderEpoch) returns offset — we still want leader > epoch as a parameter for the users who store their committed offsets > externally. > > > I am actually now leaning more to approach 1, since it is more explicit, > and maybe there are more use cases for it. > > > Thanks, > > Anna > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin wrote: > > > Hey Anna, > > > > Thanks for the comment. To answer your question, it seems that we can > cover > > all case in this KIP. As stated in "Consumer Handling" section, KIP-101 > > based approach will be used to derive the truncation offset from the > > 2-tuple (offset, leaderEpoch). This approach is best effort and it is > > inaccurate only in very rare scenarios (as described in KIP-279). > > > > By using seek(offset, leaderEpoch), consumer will still be able to follow > > this best-effort approach to detect log truncation and determine the > > truncation offset. On the other hand, if we use seek(offset), consumer > will > > not detect log truncation in some cases which weakens the guarantee of > this > > KIP. Does this make sense? > > > > Thanks, > > Dong > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner wrote: > > > > > Sorry, I hit "send" before finishing. Continuing... > > > > > > > > > 2) Hiding most of the consumer handling log truncation logic with > minimal > > > exposure in KafkaConsumer API. I was proposing this path. > > > > > > > > > Before answering your specific questions… I want to answer to your > > comment > > > “In general, maybe we should discuss the final solution that covers all > > > cases?”. With current KIP, we don’t cover all cases of consumer > detecting > > > log truncation because the KIP proposes a leader epoch cache in > consumer > > > that does not persist across restarts. Plus, we only store last > committed > > > offset (either internally or users can store externally). This has a > > > limitation that the consumer will not always be able to find point of > > > truncation just because we have a limited history (just one data > point). > > > > > > > > > So, maybe we should first agree on whether we accept that storing last > > > committed offset/leader epoch has a limitation that the consumer will > not > > > be able to detect log truncation in all cases? > > > > > > > > > Thanks, > > > > > > Anna > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner > wrote: > > > > > > > Hi Dong, > > > > > > > > Thanks for the follow up! I finally have much more clear > understanding > > of > > > > where you are coming from. > > > > > > > > You are right. The success of findOffsets()/finding a point of > > > > non-divergence depends on whether we have enough entries in the > > > consumer's > > > > leader epoch cache. However, I think this is a fundamental limitation > > of > > > > having a leader epoch cache that does not persist across consumer > > > restarts. > > > > > > > > If we consider the general case where consumer may or may not have > this > > > > cache, then I see two paths: > > > > 1)
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Dong, What I called “not covering all use cases” is what you call best-effort (not guaranteeing some corner cases). I think we are on the same page here. I wanted to be clear in the API whether the consumer seeks to a position (offset) or to a record (offset, leader epoch). The only use-case of seeking to a record is seeking to a committed offset for a user who stores committed offsets externally. (Unless users find some other reason to seek to a record.) I thought it was possible to provide this functionality with findOffset(offset, leader epoch) followed by a seek(offset). However, you are right that this will not handle the race condition where non-divergent offset found by findOffset() could change again before the consumer does the first fetch. Regarding position() — if we add position that returns (offset, leader epoch), this is specifically a position after a record that was actually consumed or position of a committed record. In which case, I still think it’s cleaner to get a record position of consumed message from a new helper method in ConsumerRecords() or from committed offsets. I think all the use-cases could be then covered with: (Approach 1) seekToRecord(offset, leaderEpoch) — this will just initialize/set the consumer state; findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch} If we agree that the race condition is also a corner case, then I think we can cover use-cases with: (Approach 2) findOffsets(offset, leaderEpoch) returns offset — we still want leader epoch as a parameter for the users who store their committed offsets externally. I am actually now leaning more to approach 1, since it is more explicit, and maybe there are more use cases for it. Thanks, Anna On Tue, Jul 10, 2018 at 3:47 PM Dong Lin wrote: > Hey Anna, > > Thanks for the comment. To answer your question, it seems that we can cover > all case in this KIP. As stated in "Consumer Handling" section, KIP-101 > based approach will be used to derive the truncation offset from the > 2-tuple (offset, leaderEpoch). This approach is best effort and it is > inaccurate only in very rare scenarios (as described in KIP-279). > > By using seek(offset, leaderEpoch), consumer will still be able to follow > this best-effort approach to detect log truncation and determine the > truncation offset. On the other hand, if we use seek(offset), consumer will > not detect log truncation in some cases which weakens the guarantee of this > KIP. Does this make sense? > > Thanks, > Dong > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner wrote: > > > Sorry, I hit "send" before finishing. Continuing... > > > > > > 2) Hiding most of the consumer handling log truncation logic with minimal > > exposure in KafkaConsumer API. I was proposing this path. > > > > > > Before answering your specific questions… I want to answer to your > comment > > “In general, maybe we should discuss the final solution that covers all > > cases?”. With current KIP, we don’t cover all cases of consumer detecting > > log truncation because the KIP proposes a leader epoch cache in consumer > > that does not persist across restarts. Plus, we only store last committed > > offset (either internally or users can store externally). This has a > > limitation that the consumer will not always be able to find point of > > truncation just because we have a limited history (just one data point). > > > > > > So, maybe we should first agree on whether we accept that storing last > > committed offset/leader epoch has a limitation that the consumer will not > > be able to detect log truncation in all cases? > > > > > > Thanks, > > > > Anna > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner wrote: > > > > > Hi Dong, > > > > > > Thanks for the follow up! I finally have much more clear understanding > of > > > where you are coming from. > > > > > > You are right. The success of findOffsets()/finding a point of > > > non-divergence depends on whether we have enough entries in the > > consumer's > > > leader epoch cache. However, I think this is a fundamental limitation > of > > > having a leader epoch cache that does not persist across consumer > > restarts. > > > > > > If we consider the general case where consumer may or may not have this > > > cache, then I see two paths: > > > 1) Letting the user to track the leader epoch history externally, and > > have > > > more exposure to leader epoch and finding point of non-divergence in > > > KafkaConsumer API. I understand this is the case you were talking > about. > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin wrote: > > > > > >> Hey Anna, > > >> > > >> Thanks much for your detailed explanation and example! It does help me > > >> understand the difference between our understanding. > > >> > > >> So it seems that the solution based on findOffsets() currently focuses > > >> mainly on the scenario that consumer has cached leaderEpoch -> offset > > >> mapping whereas I was thinking about the
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Anna, Thanks for the comment. To answer your question, it seems that we can cover all case in this KIP. As stated in "Consumer Handling" section, KIP-101 based approach will be used to derive the truncation offset from the 2-tuple (offset, leaderEpoch). This approach is best effort and it is inaccurate only in very rare scenarios (as described in KIP-279). By using seek(offset, leaderEpoch), consumer will still be able to follow this best-effort approach to detect log truncation and determine the truncation offset. On the other hand, if we use seek(offset), consumer will not detect log truncation in some cases which weakens the guarantee of this KIP. Does this make sense? Thanks, Dong On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner wrote: > Sorry, I hit "send" before finishing. Continuing... > > > 2) Hiding most of the consumer handling log truncation logic with minimal > exposure in KafkaConsumer API. I was proposing this path. > > > Before answering your specific questions… I want to answer to your comment > “In general, maybe we should discuss the final solution that covers all > cases?”. With current KIP, we don’t cover all cases of consumer detecting > log truncation because the KIP proposes a leader epoch cache in consumer > that does not persist across restarts. Plus, we only store last committed > offset (either internally or users can store externally). This has a > limitation that the consumer will not always be able to find point of > truncation just because we have a limited history (just one data point). > > > So, maybe we should first agree on whether we accept that storing last > committed offset/leader epoch has a limitation that the consumer will not > be able to detect log truncation in all cases? > > > Thanks, > > Anna > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner wrote: > > > Hi Dong, > > > > Thanks for the follow up! I finally have much more clear understanding of > > where you are coming from. > > > > You are right. The success of findOffsets()/finding a point of > > non-divergence depends on whether we have enough entries in the > consumer's > > leader epoch cache. However, I think this is a fundamental limitation of > > having a leader epoch cache that does not persist across consumer > restarts. > > > > If we consider the general case where consumer may or may not have this > > cache, then I see two paths: > > 1) Letting the user to track the leader epoch history externally, and > have > > more exposure to leader epoch and finding point of non-divergence in > > KafkaConsumer API. I understand this is the case you were talking about. > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin wrote: > > > >> Hey Anna, > >> > >> Thanks much for your detailed explanation and example! It does help me > >> understand the difference between our understanding. > >> > >> So it seems that the solution based on findOffsets() currently focuses > >> mainly on the scenario that consumer has cached leaderEpoch -> offset > >> mapping whereas I was thinking about the general case where consumer may > >> or > >> may not have this cache. I guess that is why we have different > >> understanding here. I have some comments below. > >> > >> > >> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed > >> by > >> seek(offset) works if consumer has the cached leaderEpoch -> offset > >> mapping. But if we assume consumer has this cache, do we need to have > >> leaderEpoch in the findOffsets(...)? Intuitively, the > findOffsets(offset) > >> can also derive the leaderEpoch using offset just like the proposed > >> solution does with seek(offset). > >> > >> > >> 4) If consumer does not have cached leaderEpoch -> offset mapping, which > >> is > >> the case if consumer is restarted on a new machine, then it is not clear > >> what leaderEpoch would be included in the FetchRequest if consumer does > >> seek(offset). This is the case that motivates the first question of the > >> previous email. In general, maybe we should discuss the final solution > >> that > >> covers all cases? > >> > >> > >> 5) The second question in my previous email is related to the following > >> paragraph: > >> > >> "... In some cases, offsets returned from position() could be actual > >> consumed messages by this consumer identified by {offset, leader epoch}. > >> In > >> other cases, position() returns offset that was not actually consumed. > >> Suppose, the user calls position() for the last offset...". > >> > >> I guess my point is that, if user calls position() for the last offset > and > >> uses that offset in seek(...), then user can probably just call > >> Consumer#seekToEnd() without calling position() and seek(...). Similarly > >> user can call Consumer#seekToBeginning() to the seek to the earliest > >> position without calling position() and seek(...). Thus position() only > >> needs to return the actual consumed messages identified by {offset, > leader > >> epoch}. Does this make sense? > >> > >>
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Sorry, I hit "send" before finishing. Continuing... 2) Hiding most of the consumer handling log truncation logic with minimal exposure in KafkaConsumer API. I was proposing this path. Before answering your specific questions… I want to answer to your comment “In general, maybe we should discuss the final solution that covers all cases?”. With current KIP, we don’t cover all cases of consumer detecting log truncation because the KIP proposes a leader epoch cache in consumer that does not persist across restarts. Plus, we only store last committed offset (either internally or users can store externally). This has a limitation that the consumer will not always be able to find point of truncation just because we have a limited history (just one data point). So, maybe we should first agree on whether we accept that storing last committed offset/leader epoch has a limitation that the consumer will not be able to detect log truncation in all cases? Thanks, Anna On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner wrote: > Hi Dong, > > Thanks for the follow up! I finally have much more clear understanding of > where you are coming from. > > You are right. The success of findOffsets()/finding a point of > non-divergence depends on whether we have enough entries in the consumer's > leader epoch cache. However, I think this is a fundamental limitation of > having a leader epoch cache that does not persist across consumer restarts. > > If we consider the general case where consumer may or may not have this > cache, then I see two paths: > 1) Letting the user to track the leader epoch history externally, and have > more exposure to leader epoch and finding point of non-divergence in > KafkaConsumer API. I understand this is the case you were talking about. > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin wrote: > >> Hey Anna, >> >> Thanks much for your detailed explanation and example! It does help me >> understand the difference between our understanding. >> >> So it seems that the solution based on findOffsets() currently focuses >> mainly on the scenario that consumer has cached leaderEpoch -> offset >> mapping whereas I was thinking about the general case where consumer may >> or >> may not have this cache. I guess that is why we have different >> understanding here. I have some comments below. >> >> >> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed >> by >> seek(offset) works if consumer has the cached leaderEpoch -> offset >> mapping. But if we assume consumer has this cache, do we need to have >> leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset) >> can also derive the leaderEpoch using offset just like the proposed >> solution does with seek(offset). >> >> >> 4) If consumer does not have cached leaderEpoch -> offset mapping, which >> is >> the case if consumer is restarted on a new machine, then it is not clear >> what leaderEpoch would be included in the FetchRequest if consumer does >> seek(offset). This is the case that motivates the first question of the >> previous email. In general, maybe we should discuss the final solution >> that >> covers all cases? >> >> >> 5) The second question in my previous email is related to the following >> paragraph: >> >> "... In some cases, offsets returned from position() could be actual >> consumed messages by this consumer identified by {offset, leader epoch}. >> In >> other cases, position() returns offset that was not actually consumed. >> Suppose, the user calls position() for the last offset...". >> >> I guess my point is that, if user calls position() for the last offset and >> uses that offset in seek(...), then user can probably just call >> Consumer#seekToEnd() without calling position() and seek(...). Similarly >> user can call Consumer#seekToBeginning() to the seek to the earliest >> position without calling position() and seek(...). Thus position() only >> needs to return the actual consumed messages identified by {offset, leader >> epoch}. Does this make sense? >> >> >> Thanks, >> Dong >> >> >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner wrote: >> >> > Hi Dong, >> > >> > >> > Thanks for considering my suggestions. >> > >> > >> > Based on your comments, I realized that my suggestion was not complete >> with >> > regard to KafkaConsumer API vs. consumer-broker protocol. While I >> propose >> > to keep KafkaConsumer#seek() unchanged and take offset only, the >> underlying >> > consumer will send the next FetchRequest() to broker with offset and >> > leaderEpoch if it is known (based on leader epoch cache in consumer) — >> note >> > that this is different from the current KIP, which suggests to always >> send >> > unknown leader epoch after seek(). This way, if the consumer and a >> broker >> > agreed on the point of non-divergence, which is some {offset, >> leaderEpoch} >> > pair, the new leader which causes another truncation (even further back) >> > will be able to detect new divergence and restart the
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Dong, Thanks for the follow up! I finally have much more clear understanding of where you are coming from. You are right. The success of findOffsets()/finding a point of non-divergence depends on whether we have enough entries in the consumer's leader epoch cache. However, I think this is a fundamental limitation of having a leader epoch cache that does not persist across consumer restarts. If we consider the general case where consumer may or may not have this cache, then I see two paths: 1) Letting the user to track the leader epoch history externally, and have more exposure to leader epoch and finding point of non-divergence in KafkaConsumer API. I understand this is the case you were talking about. On Tue, Jul 10, 2018 at 12:16 PM Dong Lin wrote: > Hey Anna, > > Thanks much for your detailed explanation and example! It does help me > understand the difference between our understanding. > > So it seems that the solution based on findOffsets() currently focuses > mainly on the scenario that consumer has cached leaderEpoch -> offset > mapping whereas I was thinking about the general case where consumer may or > may not have this cache. I guess that is why we have different > understanding here. I have some comments below. > > > 3) The proposed solution using findOffsets(offset, leaderEpoch) followed by > seek(offset) works if consumer has the cached leaderEpoch -> offset > mapping. But if we assume consumer has this cache, do we need to have > leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset) > can also derive the leaderEpoch using offset just like the proposed > solution does with seek(offset). > > > 4) If consumer does not have cached leaderEpoch -> offset mapping, which is > the case if consumer is restarted on a new machine, then it is not clear > what leaderEpoch would be included in the FetchRequest if consumer does > seek(offset). This is the case that motivates the first question of the > previous email. In general, maybe we should discuss the final solution that > covers all cases? > > > 5) The second question in my previous email is related to the following > paragraph: > > "... In some cases, offsets returned from position() could be actual > consumed messages by this consumer identified by {offset, leader epoch}. In > other cases, position() returns offset that was not actually consumed. > Suppose, the user calls position() for the last offset...". > > I guess my point is that, if user calls position() for the last offset and > uses that offset in seek(...), then user can probably just call > Consumer#seekToEnd() without calling position() and seek(...). Similarly > user can call Consumer#seekToBeginning() to the seek to the earliest > position without calling position() and seek(...). Thus position() only > needs to return the actual consumed messages identified by {offset, leader > epoch}. Does this make sense? > > > Thanks, > Dong > > > On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner wrote: > > > Hi Dong, > > > > > > Thanks for considering my suggestions. > > > > > > Based on your comments, I realized that my suggestion was not complete > with > > regard to KafkaConsumer API vs. consumer-broker protocol. While I propose > > to keep KafkaConsumer#seek() unchanged and take offset only, the > underlying > > consumer will send the next FetchRequest() to broker with offset and > > leaderEpoch if it is known (based on leader epoch cache in consumer) — > note > > that this is different from the current KIP, which suggests to always > send > > unknown leader epoch after seek(). This way, if the consumer and a broker > > agreed on the point of non-divergence, which is some {offset, > leaderEpoch} > > pair, the new leader which causes another truncation (even further back) > > will be able to detect new divergence and restart the process of finding > > the new point of non-divergence. So, to answer your question, If the > > truncation happens just after the user calls > > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset), > > the user will not seek to the wrong position without knowing that > > truncation has happened, because the consumer will get another truncation > > error, and seek again. > > > > > > I am afraid, I did not understand your second question. Let me summarize > my > > suggestions again, and then give an example to hopefully make my > > suggestions more clear. Also, the last part of my example shows how the > > use-case in your first question will work. If it does not answer your > > second question, would you mind clarifying? I am also focusing on the > case > > of a consumer having enough entries in the cache. The case of restarting > > from committed offset either stored externally or internally will > probably > > need to be discussed more. > > > > > > Let me summarize my suggestion again: > > > > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged > > > > 2) New KafkaConsumer#findOffsets() takes {offset,
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Anna, Thanks much for your detailed explanation and example! It does help me understand the difference between our understanding. So it seems that the solution based on findOffsets() currently focuses mainly on the scenario that consumer has cached leaderEpoch -> offset mapping whereas I was thinking about the general case where consumer may or may not have this cache. I guess that is why we have different understanding here. I have some comments below. 3) The proposed solution using findOffsets(offset, leaderEpoch) followed by seek(offset) works if consumer has the cached leaderEpoch -> offset mapping. But if we assume consumer has this cache, do we need to have leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset) can also derive the leaderEpoch using offset just like the proposed solution does with seek(offset). 4) If consumer does not have cached leaderEpoch -> offset mapping, which is the case if consumer is restarted on a new machine, then it is not clear what leaderEpoch would be included in the FetchRequest if consumer does seek(offset). This is the case that motivates the first question of the previous email. In general, maybe we should discuss the final solution that covers all cases? 5) The second question in my previous email is related to the following paragraph: "... In some cases, offsets returned from position() could be actual consumed messages by this consumer identified by {offset, leader epoch}. In other cases, position() returns offset that was not actually consumed. Suppose, the user calls position() for the last offset...". I guess my point is that, if user calls position() for the last offset and uses that offset in seek(...), then user can probably just call Consumer#seekToEnd() without calling position() and seek(...). Similarly user can call Consumer#seekToBeginning() to the seek to the earliest position without calling position() and seek(...). Thus position() only needs to return the actual consumed messages identified by {offset, leader epoch}. Does this make sense? Thanks, Dong On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner wrote: > Hi Dong, > > > Thanks for considering my suggestions. > > > Based on your comments, I realized that my suggestion was not complete with > regard to KafkaConsumer API vs. consumer-broker protocol. While I propose > to keep KafkaConsumer#seek() unchanged and take offset only, the underlying > consumer will send the next FetchRequest() to broker with offset and > leaderEpoch if it is known (based on leader epoch cache in consumer) — note > that this is different from the current KIP, which suggests to always send > unknown leader epoch after seek(). This way, if the consumer and a broker > agreed on the point of non-divergence, which is some {offset, leaderEpoch} > pair, the new leader which causes another truncation (even further back) > will be able to detect new divergence and restart the process of finding > the new point of non-divergence. So, to answer your question, If the > truncation happens just after the user calls > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset), > the user will not seek to the wrong position without knowing that > truncation has happened, because the consumer will get another truncation > error, and seek again. > > > I am afraid, I did not understand your second question. Let me summarize my > suggestions again, and then give an example to hopefully make my > suggestions more clear. Also, the last part of my example shows how the > use-case in your first question will work. If it does not answer your > second question, would you mind clarifying? I am also focusing on the case > of a consumer having enough entries in the cache. The case of restarting > from committed offset either stored externally or internally will probably > need to be discussed more. > > > Let me summarize my suggestion again: > > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged > > 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair per > topic partition and returns offset per topic partition. > > 3) FetchRequest() to broker after KafkaConsumer#seek() will contain the > offset set by seek and leaderEpoch that corresponds to the offset based on > leader epoch cache in the consumer. > > > The rest of this e-mail is a long and contrived example with several log > truncations and unclean leader elections to illustrate the API and your > first use-case. Suppose we have three brokers. Initially, Broker A, B, and > C has one message at offset 0 with leader epoch 0. Then, Broker A goes down > for some time. Broker B becomes a leader with epoch 1, and writes messages > to offsets 1 and 2. Broker C fetches offset 1, but before fetching offset > 2, becomes a leader with leader epoch 2 and writes a message at offset 2. > Here is the state of brokers at this point: > > > Broker A: > > offset 0, epoch 0 <— leader > > goes down… > > > > Broker B: > > offset 0,
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Dong, Thanks for considering my suggestions. Based on your comments, I realized that my suggestion was not complete with regard to KafkaConsumer API vs. consumer-broker protocol. While I propose to keep KafkaConsumer#seek() unchanged and take offset only, the underlying consumer will send the next FetchRequest() to broker with offset and leaderEpoch if it is known (based on leader epoch cache in consumer) — note that this is different from the current KIP, which suggests to always send unknown leader epoch after seek(). This way, if the consumer and a broker agreed on the point of non-divergence, which is some {offset, leaderEpoch} pair, the new leader which causes another truncation (even further back) will be able to detect new divergence and restart the process of finding the new point of non-divergence. So, to answer your question, If the truncation happens just after the user calls KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset), the user will not seek to the wrong position without knowing that truncation has happened, because the consumer will get another truncation error, and seek again. I am afraid, I did not understand your second question. Let me summarize my suggestions again, and then give an example to hopefully make my suggestions more clear. Also, the last part of my example shows how the use-case in your first question will work. If it does not answer your second question, would you mind clarifying? I am also focusing on the case of a consumer having enough entries in the cache. The case of restarting from committed offset either stored externally or internally will probably need to be discussed more. Let me summarize my suggestion again: 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair per topic partition and returns offset per topic partition. 3) FetchRequest() to broker after KafkaConsumer#seek() will contain the offset set by seek and leaderEpoch that corresponds to the offset based on leader epoch cache in the consumer. The rest of this e-mail is a long and contrived example with several log truncations and unclean leader elections to illustrate the API and your first use-case. Suppose we have three brokers. Initially, Broker A, B, and C has one message at offset 0 with leader epoch 0. Then, Broker A goes down for some time. Broker B becomes a leader with epoch 1, and writes messages to offsets 1 and 2. Broker C fetches offset 1, but before fetching offset 2, becomes a leader with leader epoch 2 and writes a message at offset 2. Here is the state of brokers at this point: > Broker A: > offset 0, epoch 0 <— leader > goes down… > Broker B: > offset 0, epoch 0 > offset 1, epoch 1 <- leader > offset 2, epoch 1 Broker C: > offset 0, epoch 0 > offset 1, epoch 1 > offset 2, epoch 2 <— leader Before Broker C becomes a leader with leader epoch 2, the consumer consumed the following messages from broker A and broker B: {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, {offset=2, leaderEpoch=1}. Consumer’s leader epoch cache at this point contains the following entries: (leaderEpoch=0, startOffset=0) (leaderEpoch=1, startOffset=1) endOffset = 3 Then, broker B becomes the follower of broker C, truncates and starts fetching from offset 2. Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets LOG_TRUNCATION error from broker C. In response, the client calls KafkaConsumer#findOffsets(offset=3, leaderEpoch=1). The underlying consumer sends OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with {leaderEpoch=1, endOffset=2}. So, KafkaConsumer#findOffsets(offset=3, leaderEpoch=1) returns offset=2. In response, consumer calls KafkaConsumer@seek(offset=2) followed by poll(), which results in FetchRequest(offset=2, leaderEpoch=1) to broker C. I will continue with this example with the goal to answer your first question about truncation just after findOffsets() followed by seek(): Suppose, brokers B and C go down, and broker A comes up and becomes a leader with leader epoch 3, and writes a message to offset 1. Suppose, this happens before the consumer gets response from broker C to the previous fetch request: FetchRequest(offset=2, leaderEpoch=1). Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to broker A, which returns LOG_TRUNCATION error, because broker A has leader epoch 3 > leader epoch in FetchRequest with starting offset = 1 < offset 2 in FetchRequest(). In response, the user calls KafkaConsumer#findOffsets(offset=2, leaderEpoch=1). The underlying consumer sends OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with {leaderEpoch=0, endOffset=1}; the underlying consumer finds leaderEpoch = 0 in its cache with end offset == 1, which results in KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning offset = 1. In response, the user calls KafkaConsumer@seek(offset=1) followed by poll(), which results in
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Anna, Thanks much for the thoughtful reply. It makes sense to different between "seeking to a message" and "seeking to a position". I have to questions here: - For "seeking to a message" use-case, with the proposed approach user needs to call findOffset(offset, leaderEpoch) followed by seek(offset). If message truncation and message append happen immediately after findOffset(offset, leaderEpoch) but before seek(offset), it seems that user will seek to the wrong message without knowing the truncation has happened. Would this be a problem? - For "seeking to a position" use-case, it seems that there can be two positions, i.e. earliest and latest. So these two cases can be Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then it seems that user will only need to call position() and seek() for "seeking to a message" use-case? Thanks, Dong On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner wrote: > Hi Jason and Dong, > > > I’ve been thinking about your suggestions and discussion regarding > position(), seek(), and new proposed API. > > > Here is my thought process why we should keep position() and seek() API > unchanged. > > > I think we should separate {offset, leader epoch} that uniquely identifies > a message from an offset that is a position. In some cases, offsets > returned from position() could be actual consumed messages by this consumer > identified by {offset, leader epoch}. In other cases, position() returns > offset that was not actually consumed. Suppose, the user calls position() > for the last offset. Suppose we return {offset, leader epoch} of the > message currently in the log. Then, the message gets truncated before > consumer’s first poll(). It does not make sense for poll() to fail in this > case, because the log truncation did not actually happen from the consumer > perspective. On the other hand, as the KIP proposes, it makes sense for the > committed() method to return {offset, leader epoch} because those offsets > represent actual consumed messages. > > > The same argument applies to the seek() method — we are not seeking to a > message, we are seeking to a position. > > > I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming > something like: > > Map findOffsets(Map > offsetsToSearch) > > Similar to seek() and position(), I think findOffsets() should return > offset without leader epoch, because what we want is the offset that we > think is closest to the not divergent message from the given consumed > message. Until the consumer actually fetches the message, we should not let > the consumer store the leader epoch for a message it did not consume. > > > So, the workflow will be: > > 1) The user gets LogTruncationException with {offset, leader epoch of the > previous message} (whatever we send with new FetchRecords request). > > 2) offset = findOffsets(tp -> {offset, leader epoch}) > > 3) seek(offset) > > > For the use-case where the users store committed offsets externally: > > 1) Such users would have to track the leader epoch together with an offset. > Otherwise, there is no way to detect later what leader epoch was associated > with the message. I think it’s reasonable to ask that from users if they > want to detect log truncation. Otherwise, they will get the current > behavior. > > > If the users currently get an offset to be stored using position(), I see > two possibilities. First, they call save offset returned from position() > that they call before poll(). In that case, it would not be correct to > store {offset, leader epoch} if we would have changed position() to return > {offset, leader epoch} since actual fetched message could be different > (from the example I described earlier). So, it would be more correct to > call position() after poll(). However, the user already gets > ConsumerRecords at this point, from which the user can extract {offset, > leader epoch} of the last message. > > > So, I like the idea of adding a helper method to ConsumerRecords, as Jason > proposed, something like: > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is > a data struct holding {offset, leader epoch}. > > > In this case, we would advise the user to follow the workflow: poll(), get > {offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(), > save offset and leader epoch, process records. > > > 2) When the user needs to seek to the last committed offset, they call new > findOffsets(saved offset, leader epoch), and then seek(offset). > > > What do you think? > > > Thanks, > > Anna > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin wrote: > > > Hey Jason, > > > > Thanks much for your thoughtful explanation. > > > > Yes the solution using findOffsets(offset, leaderEpoch) also works. The > > advantage of this solution it adds only one API instead of two APIs. The > > concern is that its usage seems a bit more clumsy for advanced users. > More > > specifically, advanced users who store offsets externally will
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi, Jason, Thanks for the KIP. Looks good overall. Just a few minor comments below. 1. "As the consumer is fetching from a partition, it will keep a small cache of the recent epochs that were fetched for each partition. " Do we need to cache more than one leader epoch? Also, during consumer failover, initially, only the the last epoch will be available. 2. "This KIP has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset. ". If the fetch epoch matches that in the leader, but the offset is larger than the leader's HW, should we still treat it as offset out of range? 3. "We propose in this KIP to change the behavior for both the "earliest" and "latest" reset modes to do this automatically as long as the message format supports lookup by leader epoch. ". It will probably be useful to indicate to the user that a reset has happened. So, it's probably useful to at least log this in the client. 4. "If the user ignores the exception, we will continue fetching from the current offset, but we will drop the last fetched offset metadata from the new FetchRequest so that we do not get the same log truncation error." Is it better to do this or keep failing? 5. LogTruncationException: Should we add an error code for that? 6. "We have added fields for the leader epoch and the timestamp." It seems that we only added the leader epoch? Jun On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson wrote: > Hey All, > > I wrote up a KIP to handle one more edge case in the replication protocol > and to support better handling of truncation in the consumer when unclean > leader election is enabled. Let me know what you think. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation > > Thanks to Anna Povzner and Dong Lin for initial feedback. > > Thanks, > Jason >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Jason and Dong, I’ve been thinking about your suggestions and discussion regarding position(), seek(), and new proposed API. Here is my thought process why we should keep position() and seek() API unchanged. I think we should separate {offset, leader epoch} that uniquely identifies a message from an offset that is a position. In some cases, offsets returned from position() could be actual consumed messages by this consumer identified by {offset, leader epoch}. In other cases, position() returns offset that was not actually consumed. Suppose, the user calls position() for the last offset. Suppose we return {offset, leader epoch} of the message currently in the log. Then, the message gets truncated before consumer’s first poll(). It does not make sense for poll() to fail in this case, because the log truncation did not actually happen from the consumer perspective. On the other hand, as the KIP proposes, it makes sense for the committed() method to return {offset, leader epoch} because those offsets represent actual consumed messages. The same argument applies to the seek() method — we are not seeking to a message, we are seeking to a position. I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming something like: Map findOffsets(Map offsetsToSearch) Similar to seek() and position(), I think findOffsets() should return offset without leader epoch, because what we want is the offset that we think is closest to the not divergent message from the given consumed message. Until the consumer actually fetches the message, we should not let the consumer store the leader epoch for a message it did not consume. So, the workflow will be: 1) The user gets LogTruncationException with {offset, leader epoch of the previous message} (whatever we send with new FetchRecords request). 2) offset = findOffsets(tp -> {offset, leader epoch}) 3) seek(offset) For the use-case where the users store committed offsets externally: 1) Such users would have to track the leader epoch together with an offset. Otherwise, there is no way to detect later what leader epoch was associated with the message. I think it’s reasonable to ask that from users if they want to detect log truncation. Otherwise, they will get the current behavior. If the users currently get an offset to be stored using position(), I see two possibilities. First, they call save offset returned from position() that they call before poll(). In that case, it would not be correct to store {offset, leader epoch} if we would have changed position() to return {offset, leader epoch} since actual fetched message could be different (from the example I described earlier). So, it would be more correct to call position() after poll(). However, the user already gets ConsumerRecords at this point, from which the user can extract {offset, leader epoch} of the last message. So, I like the idea of adding a helper method to ConsumerRecords, as Jason proposed, something like: public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is a data struct holding {offset, leader epoch}. In this case, we would advise the user to follow the workflow: poll(), get {offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(), save offset and leader epoch, process records. 2) When the user needs to seek to the last committed offset, they call new findOffsets(saved offset, leader epoch), and then seek(offset). What do you think? Thanks, Anna On Tue, Jul 3, 2018 at 4:06 PM Dong Lin wrote: > Hey Jason, > > Thanks much for your thoughtful explanation. > > Yes the solution using findOffsets(offset, leaderEpoch) also works. The > advantage of this solution it adds only one API instead of two APIs. The > concern is that its usage seems a bit more clumsy for advanced users. More > specifically, advanced users who store offsets externally will always need > to call findOffsets() before calling seek(offset) during consumer > initialization. And those advanced users will need to manually keep track > of the leaderEpoch of the last ConsumerRecord. > > The other solution may be more user-friendly for advanced users is to add > two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) = > offsetEpochs(topicPartition)`. > > I kind of prefer the second solution because it is easier to use for > advanced users. If we need to expose leaderEpoch anyway to safely identify > a message, it may be conceptually simpler to expose it directly in > seek(...) rather than requiring one more translation using > findOffsets(...). But I am also OK with the first solution if other > developers also favor that one :) > > Thanks, > Dong > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson > wrote: > > > Hi Dong, > > > > Thanks, I've been thinking about your suggestions a bit. It is > challenging > > to make this work given the current APIs. One of the difficulties is that > > we don't have an API to find the leader epoch for a given offset at the
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks much for your thoughtful explanation. Yes the solution using findOffsets(offset, leaderEpoch) also works. The advantage of this solution it adds only one API instead of two APIs. The concern is that its usage seems a bit more clumsy for advanced users. More specifically, advanced users who store offsets externally will always need to call findOffsets() before calling seek(offset) during consumer initialization. And those advanced users will need to manually keep track of the leaderEpoch of the last ConsumerRecord. The other solution may be more user-friendly for advanced users is to add two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) = offsetEpochs(topicPartition)`. I kind of prefer the second solution because it is easier to use for advanced users. If we need to expose leaderEpoch anyway to safely identify a message, it may be conceptually simpler to expose it directly in seek(...) rather than requiring one more translation using findOffsets(...). But I am also OK with the first solution if other developers also favor that one :) Thanks, Dong On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson wrote: > Hi Dong, > > Thanks, I've been thinking about your suggestions a bit. It is challenging > to make this work given the current APIs. One of the difficulties is that > we don't have an API to find the leader epoch for a given offset at the > moment. So if the user does a seek to offset 5, then we'll need a new API > to find the corresponding epoch in order to fulfill the new position() API. > Potentially we could modify ListOffsets to enable finding the leader epoch, > but I am not sure it is worthwhile. Perhaps it is reasonable for advanced > usage to expect that the epoch information, if needed, will be extracted > from the records directly? It might make sense to expose a helper in > `ConsumerRecords` to make this a little easier though. > > Alternatively, if we think it is important to have this information exposed > directly, we could create batch APIs to solve the naming problem. For > example: > > Map positions(); > void seek(Map positions); > > However, I'm actually leaning toward leaving the seek() and position() APIs > unchanged. Instead, we can add a new API to search for offset by timestamp > or by offset/leader epoch. Let's say we call it `findOffsets`. If the user > hits a log truncation error, they can use this API to find the closest > offset and then do a seek(). At the same time, we deprecate the > `offsetsForTimes` APIs. We now have two use cases which require finding > offsets, so I think we should make this API general and leave the door open > for future extensions. > > By the way, I'm unclear about the desire to move part of this functionality > to AdminClient. Guozhang suggested this previously, but I think it only > makes sense for cross-cutting capabilities such as topic creation. If we > have an API which is primarily useful by consumers, then I think that's > where it should be exposed. The AdminClient also has its own API integrity > and should not become a dumping ground for advanced use cases. I'll update > the KIP with the `findOffsets` API suggested above and we can see if it > does a good enough job of keeping the API simple for common cases. > > Thanks, > Jason > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin wrote: > > > Hey Jason, > > > > Regarding seek(...), it seems that we want an API for user to initialize > > consumer with (offset, leaderEpoch) and that API should allow throwing > > PartitionTruncationException. Suppose we agree on this, then > > seekToNearest() is not sufficient because it will always swallow > > PartitionTruncationException. Here we have two options. The first option > is > > to add API offsetsForLeaderEpochs() to translate (leaderEpoch, offset) to > > offset. The second option is to have add seek(offset, leaderEpoch). It > > seems that second option may be more simpler because it makes it clear > that > > (offset, leaderEpoch) will be used to identify consumer's position in a > > partition. And user only needs to handle PartitionTruncationException > from > > the poll(). In comparison the first option seems a bit harder to use > > because user have to also handle the PartitionTruncationException if > > offsetsForLeaderEpochs() returns different offset from user-provided > > offset. What do you think? > > > > If we decide to add API seek(offset, leaderEpoch), then we can decide > > whether and how to add API to translate (offset, leaderEpoch) to offset. > It > > seems that this API will be needed by advanced user to don't want auto > > offset reset (so that it can be notified) but still wants to reset offset > > to closest. For those users if probably makes sense to only have the API > in > > AdminClient. offsetsForTimes() seems like a common API that will be > needed > > by user's of consumer in general, so it may be more reasonable to stay in > > the consumer API. I don't have a strong opinion on whether >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Dong, Thanks, I've been thinking about your suggestions a bit. It is challenging to make this work given the current APIs. One of the difficulties is that we don't have an API to find the leader epoch for a given offset at the moment. So if the user does a seek to offset 5, then we'll need a new API to find the corresponding epoch in order to fulfill the new position() API. Potentially we could modify ListOffsets to enable finding the leader epoch, but I am not sure it is worthwhile. Perhaps it is reasonable for advanced usage to expect that the epoch information, if needed, will be extracted from the records directly? It might make sense to expose a helper in `ConsumerRecords` to make this a little easier though. Alternatively, if we think it is important to have this information exposed directly, we could create batch APIs to solve the naming problem. For example: Map positions(); void seek(Map positions); However, I'm actually leaning toward leaving the seek() and position() APIs unchanged. Instead, we can add a new API to search for offset by timestamp or by offset/leader epoch. Let's say we call it `findOffsets`. If the user hits a log truncation error, they can use this API to find the closest offset and then do a seek(). At the same time, we deprecate the `offsetsForTimes` APIs. We now have two use cases which require finding offsets, so I think we should make this API general and leave the door open for future extensions. By the way, I'm unclear about the desire to move part of this functionality to AdminClient. Guozhang suggested this previously, but I think it only makes sense for cross-cutting capabilities such as topic creation. If we have an API which is primarily useful by consumers, then I think that's where it should be exposed. The AdminClient also has its own API integrity and should not become a dumping ground for advanced use cases. I'll update the KIP with the `findOffsets` API suggested above and we can see if it does a good enough job of keeping the API simple for common cases. Thanks, Jason On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin wrote: > Hey Jason, > > Regarding seek(...), it seems that we want an API for user to initialize > consumer with (offset, leaderEpoch) and that API should allow throwing > PartitionTruncationException. Suppose we agree on this, then > seekToNearest() is not sufficient because it will always swallow > PartitionTruncationException. Here we have two options. The first option is > to add API offsetsForLeaderEpochs() to translate (leaderEpoch, offset) to > offset. The second option is to have add seek(offset, leaderEpoch). It > seems that second option may be more simpler because it makes it clear that > (offset, leaderEpoch) will be used to identify consumer's position in a > partition. And user only needs to handle PartitionTruncationException from > the poll(). In comparison the first option seems a bit harder to use > because user have to also handle the PartitionTruncationException if > offsetsForLeaderEpochs() returns different offset from user-provided > offset. What do you think? > > If we decide to add API seek(offset, leaderEpoch), then we can decide > whether and how to add API to translate (offset, leaderEpoch) to offset. It > seems that this API will be needed by advanced user to don't want auto > offset reset (so that it can be notified) but still wants to reset offset > to closest. For those users if probably makes sense to only have the API in > AdminClient. offsetsForTimes() seems like a common API that will be needed > by user's of consumer in general, so it may be more reasonable to stay in > the consumer API. I don't have a strong opinion on whether > offsetsForTimes() should be replaced by API in AdminClient. > > Though (offset, leaderEpoch) is needed to uniquely identify a message in > general, it is only needed for advanced users who has turned on unclean > leader election, need to use seek(..), and don't want auto offset reset. > Most other users probably just want to enable auto offset reset and store > offset in Kafka. Thus we might want to keep the existing offset-only APIs > (e.g. seek() and position()) for most users while adding new APIs for > advanced users. And yes, it seems that we need new name for position(). > > Though I think we need new APIs to carry the new information (e.g. > leaderEpoch), I am not very sure how that should look like. One possible > option is those APIs in KIP-232. Another option is something like this: > > ` > class OffsetEpochs { > long offset; > int leaderEpoch; > int partitionEpoch; // This may be needed later as discussed in KIP-232 > ... // Hopefully these are all we need to identify message in Kafka. But > if we need more then we can add new fields in this class. > } > > OffsetEpochs offsetEpochs(TopicPartition); > > void seek(TopicPartition, OffsetEpochs); > `` > > > Thanks, > Dong > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson > wrote: > > > Hey Dong, >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Regarding seek(...), it seems that we want an API for user to initialize consumer with (offset, leaderEpoch) and that API should allow throwing PartitionTruncationException. Suppose we agree on this, then seekToNearest() is not sufficient because it will always swallow PartitionTruncationException. Here we have two options. The first option is to add API offsetsForLeaderEpochs() to translate (leaderEpoch, offset) to offset. The second option is to have add seek(offset, leaderEpoch). It seems that second option may be more simpler because it makes it clear that (offset, leaderEpoch) will be used to identify consumer's position in a partition. And user only needs to handle PartitionTruncationException from the poll(). In comparison the first option seems a bit harder to use because user have to also handle the PartitionTruncationException if offsetsForLeaderEpochs() returns different offset from user-provided offset. What do you think? If we decide to add API seek(offset, leaderEpoch), then we can decide whether and how to add API to translate (offset, leaderEpoch) to offset. It seems that this API will be needed by advanced user to don't want auto offset reset (so that it can be notified) but still wants to reset offset to closest. For those users if probably makes sense to only have the API in AdminClient. offsetsForTimes() seems like a common API that will be needed by user's of consumer in general, so it may be more reasonable to stay in the consumer API. I don't have a strong opinion on whether offsetsForTimes() should be replaced by API in AdminClient. Though (offset, leaderEpoch) is needed to uniquely identify a message in general, it is only needed for advanced users who has turned on unclean leader election, need to use seek(..), and don't want auto offset reset. Most other users probably just want to enable auto offset reset and store offset in Kafka. Thus we might want to keep the existing offset-only APIs (e.g. seek() and position()) for most users while adding new APIs for advanced users. And yes, it seems that we need new name for position(). Though I think we need new APIs to carry the new information (e.g. leaderEpoch), I am not very sure how that should look like. One possible option is those APIs in KIP-232. Another option is something like this: ` class OffsetEpochs { long offset; int leaderEpoch; int partitionEpoch; // This may be needed later as discussed in KIP-232 ... // Hopefully these are all we need to identify message in Kafka. But if we need more then we can add new fields in this class. } OffsetEpochs offsetEpochs(TopicPartition); void seek(TopicPartition, OffsetEpochs); `` Thanks, Dong On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson wrote: > Hey Dong, > > Thanks for the feedback. The first three points are easy: > > 1. Yes, we should be consistent. > 2. Yes, I will add this. > 3. Yes, I think we should document the changes to the committed offset > schema. I meant to do this, but it slipped my mind. > > The latter questions are tougher. One option I was considering is to have > only `offsetsForLeaderEpochs` exposed from the consumer and to drop the new > seek() API. That seems more consistent with the current use of > `offsetsForTimes` (we don't have a separate `seekToTimestamp` API). An > alternative might be to take a page from the AdminClient API and add a new > method to generalize offset lookup. For example, we could have > `lookupOffsets(LookupOptions)`. We could then deprecate `offsetsForTimes` > and this would open the door for future extensions without needing new > APIs. > > The case of position() is a little more annoying. It would have been better > had we let this return an object so that it is easier to extend. This is > the only reason I didn't add the API to the KIP. Maybe we should bite the > bullet and fix this now? Unfortunately we'll have to come up with a new > name. Maybe `currentPosition`? > > Thoughts? > > -Jason > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin wrote: > > > Regarding points 4) and 5) above, motivation for the alternative APIs is > > that, if we decide that leaderEpoch is equally important as offset in > > identifying a message, then it may be reasonable to always specify it > > wherever offset is currently required in the consumer API to identify a > > message, e.g. position(), seek(). For example, since we allow user to > > retrieve offset using position() instead of asking user to keep track of > > the offset of the latest ConsumerRecord, may be it will be more > consistent > > for user to also retrieve leaderEpoch using position()? > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin wrote: > > > > > Hey Jason, > > > > > > Thanks for the update. It looks pretty good. Just some minor comments > > > below: > > > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception > > TruncatedPartitionException. > > > Can we make the name more consistent, e.g.
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Dong, Thanks for the feedback. The first three points are easy: 1. Yes, we should be consistent. 2. Yes, I will add this. 3. Yes, I think we should document the changes to the committed offset schema. I meant to do this, but it slipped my mind. The latter questions are tougher. One option I was considering is to have only `offsetsForLeaderEpochs` exposed from the consumer and to drop the new seek() API. That seems more consistent with the current use of `offsetsForTimes` (we don't have a separate `seekToTimestamp` API). An alternative might be to take a page from the AdminClient API and add a new method to generalize offset lookup. For example, we could have `lookupOffsets(LookupOptions)`. We could then deprecate `offsetsForTimes` and this would open the door for future extensions without needing new APIs. The case of position() is a little more annoying. It would have been better had we let this return an object so that it is easier to extend. This is the only reason I didn't add the API to the KIP. Maybe we should bite the bullet and fix this now? Unfortunately we'll have to come up with a new name. Maybe `currentPosition`? Thoughts? -Jason On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin wrote: > Regarding points 4) and 5) above, motivation for the alternative APIs is > that, if we decide that leaderEpoch is equally important as offset in > identifying a message, then it may be reasonable to always specify it > wherever offset is currently required in the consumer API to identify a > message, e.g. position(), seek(). For example, since we allow user to > retrieve offset using position() instead of asking user to keep track of > the offset of the latest ConsumerRecord, may be it will be more consistent > for user to also retrieve leaderEpoch using position()? > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin wrote: > > > Hey Jason, > > > > Thanks for the update. It looks pretty good. Just some minor comments > > below: > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception > TruncatedPartitionException. > > Can we make the name more consistent, e.g. LogTruncationException? > > > > 2) Do we need to add UnknownLeaderEpochException as part of API change? > > > > 3) Not sure if the offset topic schema is also public API. If so, maybe > we > > should also include the schema change in the API? > > > > 4) For users who store offset externally, currently they get offset using > > position(..), store the offset externally, and use seek(..) to initialize > > the consumer next time. After this KIP they will need to store and use > the > > leaderEpoch together with the offset. Should we also update the API so > that > > user can also get leaderEpoch from position(...)? Not sure if it is OK to > > ask user to track the latest leaderEpoch of ConsumerRecord by themselves. > > > > 5) Also for users who store offset externally, they need to call seek(..) > > with leaderEpoch to initialize consumer. With current KIP users need to > > call seekToNearest(), whose name suggests that the final position may be > > different from what was requested. However, if users may want to avoid > auto > > offset reset and be notified explicitly when there is log truncation, > then seekToNearest() > > probably does not help here. Would it make sense to replace > seekToNearest() > > with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)? > > > > > > Thanks, > > Dong > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson > > wrote: > > > >> Hey Guozhang, > >> > >> That's fair. In fact, perhaps we do not need this API at all. We already > >> have the new seek() in this KIP which can do the lookup based on epoch > for > >> this use case. I guess we should probably call it seekToNearest() though > >> to > >> make it clear that the final position may be different from what was > >> requested. > >> > >> Thanks, > >> Jason > >> > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang > >> wrote: > >> > >> > Hi Jason, > >> > > >> > I think it is less worthwhile to add KafkaConsumer#offsetsForLeader > >> Epochs, > >> > since probably only very advanced users are aware of the leaderEpoch, > >> and > >> > hence ever care to use it anyways. It is more like an admin client > >> > operation than a consumer client operation: if the motivation is to > >> > facility customized reset policy, maybe adding it as > >> > AdminClient#offsetsForLeaderEpochs > >> > is better as it is not an aggressive assumption that for such advanced > >> > users they are willing to use some admin client to get further > >> information? > >> > > >> > > >> > Guozhang > >> > > >> > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson > >> > wrote: > >> > > >> > > Thanks for the feedback. I've updated the KIP. Specifically I > removed > >> the > >> > > "closest" reset option and the proposal to reset by timestamp when > the > >> > > precise truncation point cannot be determined. Instead, I proposed > >> that > >> > we > >> > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Regarding points 4) and 5) above, motivation for the alternative APIs is that, if we decide that leaderEpoch is equally important as offset in identifying a message, then it may be reasonable to always specify it wherever offset is currently required in the consumer API to identify a message, e.g. position(), seek(). For example, since we allow user to retrieve offset using position() instead of asking user to keep track of the offset of the latest ConsumerRecord, may be it will be more consistent for user to also retrieve leaderEpoch using position()? On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin wrote: > Hey Jason, > > Thanks for the update. It looks pretty good. Just some minor comments > below: > > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception > TruncatedPartitionException. > Can we make the name more consistent, e.g. LogTruncationException? > > 2) Do we need to add UnknownLeaderEpochException as part of API change? > > 3) Not sure if the offset topic schema is also public API. If so, maybe we > should also include the schema change in the API? > > 4) For users who store offset externally, currently they get offset using > position(..), store the offset externally, and use seek(..) to initialize > the consumer next time. After this KIP they will need to store and use the > leaderEpoch together with the offset. Should we also update the API so that > user can also get leaderEpoch from position(...)? Not sure if it is OK to > ask user to track the latest leaderEpoch of ConsumerRecord by themselves. > > 5) Also for users who store offset externally, they need to call seek(..) > with leaderEpoch to initialize consumer. With current KIP users need to > call seekToNearest(), whose name suggests that the final position may be > different from what was requested. However, if users may want to avoid auto > offset reset and be notified explicitly when there is log truncation, then > seekToNearest() > probably does not help here. Would it make sense to replace seekToNearest() > with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)? > > > Thanks, > Dong > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson > wrote: > >> Hey Guozhang, >> >> That's fair. In fact, perhaps we do not need this API at all. We already >> have the new seek() in this KIP which can do the lookup based on epoch for >> this use case. I guess we should probably call it seekToNearest() though >> to >> make it clear that the final position may be different from what was >> requested. >> >> Thanks, >> Jason >> >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang >> wrote: >> >> > Hi Jason, >> > >> > I think it is less worthwhile to add KafkaConsumer#offsetsForLeader >> Epochs, >> > since probably only very advanced users are aware of the leaderEpoch, >> and >> > hence ever care to use it anyways. It is more like an admin client >> > operation than a consumer client operation: if the motivation is to >> > facility customized reset policy, maybe adding it as >> > AdminClient#offsetsForLeaderEpochs >> > is better as it is not an aggressive assumption that for such advanced >> > users they are willing to use some admin client to get further >> information? >> > >> > >> > Guozhang >> > >> > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson >> > wrote: >> > >> > > Thanks for the feedback. I've updated the KIP. Specifically I removed >> the >> > > "closest" reset option and the proposal to reset by timestamp when the >> > > precise truncation point cannot be determined. Instead, I proposed >> that >> > we >> > > always reset using the nearest epoch when a reset policy is defined >> > (either >> > > "earliest" or "latest"). Does that sound reasonable? >> > > >> > > One thing I am still debating is whether it would be better to have a >> > > separate API to find the closest offset using the leader epoch. In the >> > > current KIP, I suggested to piggyback this information on an >> exception, >> > but >> > > I'm beginning to think it would be better not to hide the lookup. It >> is >> > > awkward to implement since it means delaying the exception and the API >> > may >> > > actually be useful when customizing reset logic if no auto reset >> policy >> > is >> > > defined. I was thinking we can add an API like the following: >> > > >> > > Map >> > > offsetsForLeaderEpochs(Map epochsToSearch) >> > > >> > > Thoughts? >> > > >> > > -Jason >> > > >> > > >> > > >> > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson > > >> > > wrote: >> > > >> > > > @Dong >> > > > >> > > > Those are fair points. Both approaches require some fuzziness to >> reset >> > > the >> > > > offset in these pathological scenarios and we cannot guarantee >> > > > at-least-once delivery either way unless we have the full history of >> > > leader >> > > > epochs that were consumed. The KIP-101 logic may actually be more >> > > accurate >> > > > than using timestamps because it does not depend on the messages >> which >> > > are >> > > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for the update. It looks pretty good. Just some minor comments below: 1) The KIP adds new error code "LOG_TRUNCATION" and new exception TruncatedPartitionException. Can we make the name more consistent, e.g. LogTruncationException? 2) Do we need to add UnknownLeaderEpochException as part of API change? 3) Not sure if the offset topic schema is also public API. If so, maybe we should also include the schema change in the API? 4) For users who store offset externally, currently they get offset using position(..), store the offset externally, and use seek(..) to initialize the consumer next time. After this KIP they will need to store and use the leaderEpoch together with the offset. Should we also update the API so that user can also get leaderEpoch from position(...)? Not sure if it is OK to ask user to track the latest leaderEpoch of ConsumerRecord by themselves. 5) Also for users who store offset externally, they need to call seek(..) with leaderEpoch to initialize consumer. With current KIP users need to call seekToNearest(), whose name suggests that the final position may be different from what was requested. However, if users may want to avoid auto offset reset and be notified explicitly when there is log truncation, then seekToNearest() probably does not help here. Would it make sense to replace seekToNearest() with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)? Thanks, Dong On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson wrote: > Hey Guozhang, > > That's fair. In fact, perhaps we do not need this API at all. We already > have the new seek() in this KIP which can do the lookup based on epoch for > this use case. I guess we should probably call it seekToNearest() though to > make it clear that the final position may be different from what was > requested. > > Thanks, > Jason > > On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang wrote: > > > Hi Jason, > > > > I think it is less worthwhile to add KafkaConsumer# > offsetsForLeaderEpochs, > > since probably only very advanced users are aware of the leaderEpoch, and > > hence ever care to use it anyways. It is more like an admin client > > operation than a consumer client operation: if the motivation is to > > facility customized reset policy, maybe adding it as > > AdminClient#offsetsForLeaderEpochs > > is better as it is not an aggressive assumption that for such advanced > > users they are willing to use some admin client to get further > information? > > > > > > Guozhang > > > > > > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson > > wrote: > > > > > Thanks for the feedback. I've updated the KIP. Specifically I removed > the > > > "closest" reset option and the proposal to reset by timestamp when the > > > precise truncation point cannot be determined. Instead, I proposed that > > we > > > always reset using the nearest epoch when a reset policy is defined > > (either > > > "earliest" or "latest"). Does that sound reasonable? > > > > > > One thing I am still debating is whether it would be better to have a > > > separate API to find the closest offset using the leader epoch. In the > > > current KIP, I suggested to piggyback this information on an exception, > > but > > > I'm beginning to think it would be better not to hide the lookup. It is > > > awkward to implement since it means delaying the exception and the API > > may > > > actually be useful when customizing reset logic if no auto reset policy > > is > > > defined. I was thinking we can add an API like the following: > > > > > > Map > > > offsetsForLeaderEpochs(Map epochsToSearch) > > > > > > Thoughts? > > > > > > -Jason > > > > > > > > > > > > > > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson > > > wrote: > > > > > > > @Dong > > > > > > > > Those are fair points. Both approaches require some fuzziness to > reset > > > the > > > > offset in these pathological scenarios and we cannot guarantee > > > > at-least-once delivery either way unless we have the full history of > > > leader > > > > epochs that were consumed. The KIP-101 logic may actually be more > > > accurate > > > > than using timestamps because it does not depend on the messages > which > > > are > > > > written after the unclean leader election. The case we're talking > about > > > > should be extremely rare in practice anyway. I also agree that we may > > not > > > > want to add new machinery if it only helps the old message format. > Ok, > > > > let's go ahead and drop the timestamp. > > > > > > > > @Guozhang > > > > > > > > * My current understanding is that, with unclean leader election > turned > > > on, > > > >> exactly-once is out of the window since we cannot guarantee that all > > > >> committed message markers will not be lost. And hence there is no > need > > > to > > > >> have special handling logic for LOG_TRUNCATED or OOR error codes > with > > > >> read.committed turned on. Is that right? > > > > > > > > > > > > Yes, that's right. EoS and unclean leader
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Sounds good to me. On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson wrote: > Hey Guozhang, > > That's fair. In fact, perhaps we do not need this API at all. We already > have the new seek() in this KIP which can do the lookup based on epoch for > this use case. I guess we should probably call it seekToNearest() though to > make it clear that the final position may be different from what was > requested. > > Thanks, > Jason > > On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang wrote: > > > Hi Jason, > > > > I think it is less worthwhile to add KafkaConsumer# > offsetsForLeaderEpochs, > > since probably only very advanced users are aware of the leaderEpoch, and > > hence ever care to use it anyways. It is more like an admin client > > operation than a consumer client operation: if the motivation is to > > facility customized reset policy, maybe adding it as > > AdminClient#offsetsForLeaderEpochs > > is better as it is not an aggressive assumption that for such advanced > > users they are willing to use some admin client to get further > information? > > > > > > Guozhang > > > > > > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson > > wrote: > > > > > Thanks for the feedback. I've updated the KIP. Specifically I removed > the > > > "closest" reset option and the proposal to reset by timestamp when the > > > precise truncation point cannot be determined. Instead, I proposed that > > we > > > always reset using the nearest epoch when a reset policy is defined > > (either > > > "earliest" or "latest"). Does that sound reasonable? > > > > > > One thing I am still debating is whether it would be better to have a > > > separate API to find the closest offset using the leader epoch. In the > > > current KIP, I suggested to piggyback this information on an exception, > > but > > > I'm beginning to think it would be better not to hide the lookup. It is > > > awkward to implement since it means delaying the exception and the API > > may > > > actually be useful when customizing reset logic if no auto reset policy > > is > > > defined. I was thinking we can add an API like the following: > > > > > > Map > > > offsetsForLeaderEpochs(Map epochsToSearch) > > > > > > Thoughts? > > > > > > -Jason > > > > > > > > > > > > > > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson > > > wrote: > > > > > > > @Dong > > > > > > > > Those are fair points. Both approaches require some fuzziness to > reset > > > the > > > > offset in these pathological scenarios and we cannot guarantee > > > > at-least-once delivery either way unless we have the full history of > > > leader > > > > epochs that were consumed. The KIP-101 logic may actually be more > > > accurate > > > > than using timestamps because it does not depend on the messages > which > > > are > > > > written after the unclean leader election. The case we're talking > about > > > > should be extremely rare in practice anyway. I also agree that we may > > not > > > > want to add new machinery if it only helps the old message format. > Ok, > > > > let's go ahead and drop the timestamp. > > > > > > > > @Guozhang > > > > > > > > * My current understanding is that, with unclean leader election > turned > > > on, > > > >> exactly-once is out of the window since we cannot guarantee that all > > > >> committed message markers will not be lost. And hence there is no > need > > > to > > > >> have special handling logic for LOG_TRUNCATED or OOR error codes > with > > > >> read.committed turned on. Is that right? > > > > > > > > > > > > Yes, that's right. EoS and unclean leader election don't mix well. It > > may > > > > be worth considering separately whether we should try to reconcile > the > > > > transaction log following an unclean leader election. At least we may > > be > > > > able to prevent dangling transactions from blocking consumers. This > KIP > > > > does not address this problem. > > > > > > > > * MINOR: "if the epoch is greater than the minimum expected epoch, > that > > > the > > > >> new epoch does not begin at an earlier offset than the fetch offset. > > In > > > >> the latter case, the leader can respond with a new LOG_TRUNCATION > > error > > > >> code" should it be "does not begin at a later offset than the fetch > > > >> offset"? > > > > > > > > > > > > I think the comment is correct, though the phrasing may be confusing. > > We > > > > know truncation has occurred if there exists a larger epoch with a > > > starting > > > > offset that is lower than the fetch offset. Let me try to rephrase > > this. > > > > > > > > Thanks, > > > > Jason > > > > > > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang > > > wrote: > > > > > > > >> Jason, thanks for the KIP. A few comments: > > > >> > > > >> * I think Dong's question about whether to use timestamp-based > > approach > > > >> v.s. start-offset-of-first-larger-epoch is valid; more > specifically, > > > with > > > >> timestamp-based approach we may still be reseting to an offset > falling > > > >> into > > > >> the
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Guozhang, That's fair. In fact, perhaps we do not need this API at all. We already have the new seek() in this KIP which can do the lookup based on epoch for this use case. I guess we should probably call it seekToNearest() though to make it clear that the final position may be different from what was requested. Thanks, Jason On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang wrote: > Hi Jason, > > I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs, > since probably only very advanced users are aware of the leaderEpoch, and > hence ever care to use it anyways. It is more like an admin client > operation than a consumer client operation: if the motivation is to > facility customized reset policy, maybe adding it as > AdminClient#offsetsForLeaderEpochs > is better as it is not an aggressive assumption that for such advanced > users they are willing to use some admin client to get further information? > > > Guozhang > > > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson > wrote: > > > Thanks for the feedback. I've updated the KIP. Specifically I removed the > > "closest" reset option and the proposal to reset by timestamp when the > > precise truncation point cannot be determined. Instead, I proposed that > we > > always reset using the nearest epoch when a reset policy is defined > (either > > "earliest" or "latest"). Does that sound reasonable? > > > > One thing I am still debating is whether it would be better to have a > > separate API to find the closest offset using the leader epoch. In the > > current KIP, I suggested to piggyback this information on an exception, > but > > I'm beginning to think it would be better not to hide the lookup. It is > > awkward to implement since it means delaying the exception and the API > may > > actually be useful when customizing reset logic if no auto reset policy > is > > defined. I was thinking we can add an API like the following: > > > > Map > > offsetsForLeaderEpochs(Map epochsToSearch) > > > > Thoughts? > > > > -Jason > > > > > > > > > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson > > wrote: > > > > > @Dong > > > > > > Those are fair points. Both approaches require some fuzziness to reset > > the > > > offset in these pathological scenarios and we cannot guarantee > > > at-least-once delivery either way unless we have the full history of > > leader > > > epochs that were consumed. The KIP-101 logic may actually be more > > accurate > > > than using timestamps because it does not depend on the messages which > > are > > > written after the unclean leader election. The case we're talking about > > > should be extremely rare in practice anyway. I also agree that we may > not > > > want to add new machinery if it only helps the old message format. Ok, > > > let's go ahead and drop the timestamp. > > > > > > @Guozhang > > > > > > * My current understanding is that, with unclean leader election turned > > on, > > >> exactly-once is out of the window since we cannot guarantee that all > > >> committed message markers will not be lost. And hence there is no need > > to > > >> have special handling logic for LOG_TRUNCATED or OOR error codes with > > >> read.committed turned on. Is that right? > > > > > > > > > Yes, that's right. EoS and unclean leader election don't mix well. It > may > > > be worth considering separately whether we should try to reconcile the > > > transaction log following an unclean leader election. At least we may > be > > > able to prevent dangling transactions from blocking consumers. This KIP > > > does not address this problem. > > > > > > * MINOR: "if the epoch is greater than the minimum expected epoch, that > > the > > >> new epoch does not begin at an earlier offset than the fetch offset. > In > > >> the latter case, the leader can respond with a new LOG_TRUNCATION > error > > >> code" should it be "does not begin at a later offset than the fetch > > >> offset"? > > > > > > > > > I think the comment is correct, though the phrasing may be confusing. > We > > > know truncation has occurred if there exists a larger epoch with a > > starting > > > offset that is lower than the fetch offset. Let me try to rephrase > this. > > > > > > Thanks, > > > Jason > > > > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang > > wrote: > > > > > >> Jason, thanks for the KIP. A few comments: > > >> > > >> * I think Dong's question about whether to use timestamp-based > approach > > >> v.s. start-offset-of-first-larger-epoch is valid; more specifically, > > with > > >> timestamp-based approach we may still be reseting to an offset falling > > >> into > > >> the truncated interval, and hence we may still miss some data, i.e. > not > > >> guaranteeing at-least-once still. With the > > >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee > > no > > >> valid data is missed when we have consecutive log truncations (maybe > we > > >> need to look back into details of KIP-101 to figure it out). If the > > latter
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hi Jason, I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs, since probably only very advanced users are aware of the leaderEpoch, and hence ever care to use it anyways. It is more like an admin client operation than a consumer client operation: if the motivation is to facility customized reset policy, maybe adding it as AdminClient#offsetsForLeaderEpochs is better as it is not an aggressive assumption that for such advanced users they are willing to use some admin client to get further information? Guozhang On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson wrote: > Thanks for the feedback. I've updated the KIP. Specifically I removed the > "closest" reset option and the proposal to reset by timestamp when the > precise truncation point cannot be determined. Instead, I proposed that we > always reset using the nearest epoch when a reset policy is defined (either > "earliest" or "latest"). Does that sound reasonable? > > One thing I am still debating is whether it would be better to have a > separate API to find the closest offset using the leader epoch. In the > current KIP, I suggested to piggyback this information on an exception, but > I'm beginning to think it would be better not to hide the lookup. It is > awkward to implement since it means delaying the exception and the API may > actually be useful when customizing reset logic if no auto reset policy is > defined. I was thinking we can add an API like the following: > > Map > offsetsForLeaderEpochs(Map epochsToSearch) > > Thoughts? > > -Jason > > > > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson > wrote: > > > @Dong > > > > Those are fair points. Both approaches require some fuzziness to reset > the > > offset in these pathological scenarios and we cannot guarantee > > at-least-once delivery either way unless we have the full history of > leader > > epochs that were consumed. The KIP-101 logic may actually be more > accurate > > than using timestamps because it does not depend on the messages which > are > > written after the unclean leader election. The case we're talking about > > should be extremely rare in practice anyway. I also agree that we may not > > want to add new machinery if it only helps the old message format. Ok, > > let's go ahead and drop the timestamp. > > > > @Guozhang > > > > * My current understanding is that, with unclean leader election turned > on, > >> exactly-once is out of the window since we cannot guarantee that all > >> committed message markers will not be lost. And hence there is no need > to > >> have special handling logic for LOG_TRUNCATED or OOR error codes with > >> read.committed turned on. Is that right? > > > > > > Yes, that's right. EoS and unclean leader election don't mix well. It may > > be worth considering separately whether we should try to reconcile the > > transaction log following an unclean leader election. At least we may be > > able to prevent dangling transactions from blocking consumers. This KIP > > does not address this problem. > > > > * MINOR: "if the epoch is greater than the minimum expected epoch, that > the > >> new epoch does not begin at an earlier offset than the fetch offset. In > >> the latter case, the leader can respond with a new LOG_TRUNCATION error > >> code" should it be "does not begin at a later offset than the fetch > >> offset"? > > > > > > I think the comment is correct, though the phrasing may be confusing. We > > know truncation has occurred if there exists a larger epoch with a > starting > > offset that is lower than the fetch offset. Let me try to rephrase this. > > > > Thanks, > > Jason > > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang > wrote: > > > >> Jason, thanks for the KIP. A few comments: > >> > >> * I think Dong's question about whether to use timestamp-based approach > >> v.s. start-offset-of-first-larger-epoch is valid; more specifically, > with > >> timestamp-based approach we may still be reseting to an offset falling > >> into > >> the truncated interval, and hence we may still miss some data, i.e. not > >> guaranteeing at-least-once still. With the > >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee > no > >> valid data is missed when we have consecutive log truncations (maybe we > >> need to look back into details of KIP-101 to figure it out). If the > latter > >> can indeed guarantee at least once, we could consider using that > approach. > >> > >> * My current understanding is that, with unclean leader election turned > >> on, > >> exactly-once is out of the window since we cannot guarantee that all > >> committed message markers will not be lost. And hence there is no need > to > >> have special handling logic for LOG_TRUNCATED or OOR error codes with > >> read.committed turned on. Is that right? > >> > >> * MINOR: "if the epoch is greater than the minimum expected epoch, that > >> the > >> new epoch does not begin at an earlier offset than the fetch offset. In > >> the latter
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Thanks for the feedback. I've updated the KIP. Specifically I removed the "closest" reset option and the proposal to reset by timestamp when the precise truncation point cannot be determined. Instead, I proposed that we always reset using the nearest epoch when a reset policy is defined (either "earliest" or "latest"). Does that sound reasonable? One thing I am still debating is whether it would be better to have a separate API to find the closest offset using the leader epoch. In the current KIP, I suggested to piggyback this information on an exception, but I'm beginning to think it would be better not to hide the lookup. It is awkward to implement since it means delaying the exception and the API may actually be useful when customizing reset logic if no auto reset policy is defined. I was thinking we can add an API like the following: Map offsetsForLeaderEpochs(Map epochsToSearch) Thoughts? -Jason On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson wrote: > @Dong > > Those are fair points. Both approaches require some fuzziness to reset the > offset in these pathological scenarios and we cannot guarantee > at-least-once delivery either way unless we have the full history of leader > epochs that were consumed. The KIP-101 logic may actually be more accurate > than using timestamps because it does not depend on the messages which are > written after the unclean leader election. The case we're talking about > should be extremely rare in practice anyway. I also agree that we may not > want to add new machinery if it only helps the old message format. Ok, > let's go ahead and drop the timestamp. > > @Guozhang > > * My current understanding is that, with unclean leader election turned on, >> exactly-once is out of the window since we cannot guarantee that all >> committed message markers will not be lost. And hence there is no need to >> have special handling logic for LOG_TRUNCATED or OOR error codes with >> read.committed turned on. Is that right? > > > Yes, that's right. EoS and unclean leader election don't mix well. It may > be worth considering separately whether we should try to reconcile the > transaction log following an unclean leader election. At least we may be > able to prevent dangling transactions from blocking consumers. This KIP > does not address this problem. > > * MINOR: "if the epoch is greater than the minimum expected epoch, that the >> new epoch does not begin at an earlier offset than the fetch offset. In >> the latter case, the leader can respond with a new LOG_TRUNCATION error >> code" should it be "does not begin at a later offset than the fetch >> offset"? > > > I think the comment is correct, though the phrasing may be confusing. We > know truncation has occurred if there exists a larger epoch with a starting > offset that is lower than the fetch offset. Let me try to rephrase this. > > Thanks, > Jason > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang wrote: > >> Jason, thanks for the KIP. A few comments: >> >> * I think Dong's question about whether to use timestamp-based approach >> v.s. start-offset-of-first-larger-epoch is valid; more specifically, with >> timestamp-based approach we may still be reseting to an offset falling >> into >> the truncated interval, and hence we may still miss some data, i.e. not >> guaranteeing at-least-once still. With the >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no >> valid data is missed when we have consecutive log truncations (maybe we >> need to look back into details of KIP-101 to figure it out). If the latter >> can indeed guarantee at least once, we could consider using that approach. >> >> * My current understanding is that, with unclean leader election turned >> on, >> exactly-once is out of the window since we cannot guarantee that all >> committed message markers will not be lost. And hence there is no need to >> have special handling logic for LOG_TRUNCATED or OOR error codes with >> read.committed turned on. Is that right? >> >> * MINOR: "if the epoch is greater than the minimum expected epoch, that >> the >> new epoch does not begin at an earlier offset than the fetch offset. In >> the latter case, the leader can respond with a new LOG_TRUNCATION error >> code" should it be "does not begin at a later offset than the fetch >> offset"? >> >> >> >> Guozhang >> >> >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin wrote: >> >> > Hey Jason, >> > >> > Thanks for the explanation. >> > >> > Please correct me if this is wrong. The "unknown truncation offset" >> > scenario happens when consumer does not have the full leaderEpoch -> >> offset >> > mapping. In this case we can still use the KIP-101-based approach to >> > truncate offset to "start offset of the first Leader Epoch larger than >> last >> > epoch of the consumer" but it may be inaccurate. So the KIP chooses to >> use >> > the timestamp-based approach which is also best-effort. >> > >> > If this understanding is correct, for "closest" offset
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
@Dong Those are fair points. Both approaches require some fuzziness to reset the offset in these pathological scenarios and we cannot guarantee at-least-once delivery either way unless we have the full history of leader epochs that were consumed. The KIP-101 logic may actually be more accurate than using timestamps because it does not depend on the messages which are written after the unclean leader election. The case we're talking about should be extremely rare in practice anyway. I also agree that we may not want to add new machinery if it only helps the old message format. Ok, let's go ahead and drop the timestamp. @Guozhang * My current understanding is that, with unclean leader election turned on, > exactly-once is out of the window since we cannot guarantee that all > committed message markers will not be lost. And hence there is no need to > have special handling logic for LOG_TRUNCATED or OOR error codes with > read.committed turned on. Is that right? Yes, that's right. EoS and unclean leader election don't mix well. It may be worth considering separately whether we should try to reconcile the transaction log following an unclean leader election. At least we may be able to prevent dangling transactions from blocking consumers. This KIP does not address this problem. * MINOR: "if the epoch is greater than the minimum expected epoch, that the > new epoch does not begin at an earlier offset than the fetch offset. In > the latter case, the leader can respond with a new LOG_TRUNCATION error > code" should it be "does not begin at a later offset than the fetch > offset"? I think the comment is correct, though the phrasing may be confusing. We know truncation has occurred if there exists a larger epoch with a starting offset that is lower than the fetch offset. Let me try to rephrase this. Thanks, Jason On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang wrote: > Jason, thanks for the KIP. A few comments: > > * I think Dong's question about whether to use timestamp-based approach > v.s. start-offset-of-first-larger-epoch is valid; more specifically, with > timestamp-based approach we may still be reseting to an offset falling into > the truncated interval, and hence we may still miss some data, i.e. not > guaranteeing at-least-once still. With the > start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no > valid data is missed when we have consecutive log truncations (maybe we > need to look back into details of KIP-101 to figure it out). If the latter > can indeed guarantee at least once, we could consider using that approach. > > * My current understanding is that, with unclean leader election turned on, > exactly-once is out of the window since we cannot guarantee that all > committed message markers will not be lost. And hence there is no need to > have special handling logic for LOG_TRUNCATED or OOR error codes with > read.committed turned on. Is that right? > > * MINOR: "if the epoch is greater than the minimum expected epoch, that the > new epoch does not begin at an earlier offset than the fetch offset. In > the latter case, the leader can respond with a new LOG_TRUNCATION error > code" should it be "does not begin at a later offset than the fetch > offset"? > > > > Guozhang > > > On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin wrote: > > > Hey Jason, > > > > Thanks for the explanation. > > > > Please correct me if this is wrong. The "unknown truncation offset" > > scenario happens when consumer does not have the full leaderEpoch -> > offset > > mapping. In this case we can still use the KIP-101-based approach to > > truncate offset to "start offset of the first Leader Epoch larger than > last > > epoch of the consumer" but it may be inaccurate. So the KIP chooses to > use > > the timestamp-based approach which is also best-effort. > > > > If this understanding is correct, for "closest" offset reset policy and > > "unknown truncation offset" scenario, I am wondering whether it maybe > > better to replace timestamp-based approach with KIP-101 based approach. > In > > comparison to timestamp-based approach, the KIP-101-based approach seems > to > > simplify the API a bit since user does not need to understand timestamp. > > Similar to the timestamp-based approach, both approaches are best-effort > > and do not guarantee that consumer can consume all messages. It is not > like > > KIP-279 which guarantees that follower broker can consume all messages > from > > the leader. > > > > Then it seems that the remaining difference is mostly about accuracy, > i.e. > > how much message will be duplicated or missed in the "unknown truncation > > offset" scenario. Not sure either one is clearly better than the other. > > Note that there are two scenarios mentioned in KIP-279 which are not > > addressed by KIP-101. Both scenarios require quick leadership change > > between brokers, which seems to suggest that the offset based obtained > > by "start > > offset of the first Leader Epoch
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Jason, thanks for the KIP. A few comments: * I think Dong's question about whether to use timestamp-based approach v.s. start-offset-of-first-larger-epoch is valid; more specifically, with timestamp-based approach we may still be reseting to an offset falling into the truncated interval, and hence we may still miss some data, i.e. not guaranteeing at-least-once still. With the start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no valid data is missed when we have consecutive log truncations (maybe we need to look back into details of KIP-101 to figure it out). If the latter can indeed guarantee at least once, we could consider using that approach. * My current understanding is that, with unclean leader election turned on, exactly-once is out of the window since we cannot guarantee that all committed message markers will not be lost. And hence there is no need to have special handling logic for LOG_TRUNCATED or OOR error codes with read.committed turned on. Is that right? * MINOR: "if the epoch is greater than the minimum expected epoch, that the new epoch does not begin at an earlier offset than the fetch offset. In the latter case, the leader can respond with a new LOG_TRUNCATION error code" should it be "does not begin at a later offset than the fetch offset"? Guozhang On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin wrote: > Hey Jason, > > Thanks for the explanation. > > Please correct me if this is wrong. The "unknown truncation offset" > scenario happens when consumer does not have the full leaderEpoch -> offset > mapping. In this case we can still use the KIP-101-based approach to > truncate offset to "start offset of the first Leader Epoch larger than last > epoch of the consumer" but it may be inaccurate. So the KIP chooses to use > the timestamp-based approach which is also best-effort. > > If this understanding is correct, for "closest" offset reset policy and > "unknown truncation offset" scenario, I am wondering whether it maybe > better to replace timestamp-based approach with KIP-101 based approach. In > comparison to timestamp-based approach, the KIP-101-based approach seems to > simplify the API a bit since user does not need to understand timestamp. > Similar to the timestamp-based approach, both approaches are best-effort > and do not guarantee that consumer can consume all messages. It is not like > KIP-279 which guarantees that follower broker can consume all messages from > the leader. > > Then it seems that the remaining difference is mostly about accuracy, i.e. > how much message will be duplicated or missed in the "unknown truncation > offset" scenario. Not sure either one is clearly better than the other. > Note that there are two scenarios mentioned in KIP-279 which are not > addressed by KIP-101. Both scenarios require quick leadership change > between brokers, which seems to suggest that the offset based obtained > by "start > offset of the first Leader Epoch larger than last epoch of the consumer" > under these two scenarios may be very close to the offset obtained by the > message timestamp. Does this sound reasonable? > > Good point that users on v1 format can get benefit with timestamp based > approach. On the other hand it seems like a short term benefit for users > who have not migrated. I am just not sure whether it is more important than > designing a better API. > > Also, for both "latest" and "earliest" reset policy, do you think it would > make sense to also use the KIP-101 based approach to truncate offset for > the "unknown truncation offset" scenario? > > > Thanks, > Dong > -- -- Guozhang
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for the explanation. Please correct me if this is wrong. The "unknown truncation offset" scenario happens when consumer does not have the full leaderEpoch -> offset mapping. In this case we can still use the KIP-101-based approach to truncate offset to "start offset of the first Leader Epoch larger than last epoch of the consumer" but it may be inaccurate. So the KIP chooses to use the timestamp-based approach which is also best-effort. If this understanding is correct, for "closest" offset reset policy and "unknown truncation offset" scenario, I am wondering whether it maybe better to replace timestamp-based approach with KIP-101 based approach. In comparison to timestamp-based approach, the KIP-101-based approach seems to simplify the API a bit since user does not need to understand timestamp. Similar to the timestamp-based approach, both approaches are best-effort and do not guarantee that consumer can consume all messages. It is not like KIP-279 which guarantees that follower broker can consume all messages from the leader. Then it seems that the remaining difference is mostly about accuracy, i.e. how much message will be duplicated or missed in the "unknown truncation offset" scenario. Not sure either one is clearly better than the other. Note that there are two scenarios mentioned in KIP-279 which are not addressed by KIP-101. Both scenarios require quick leadership change between brokers, which seems to suggest that the offset based obtained by "start offset of the first Leader Epoch larger than last epoch of the consumer" under these two scenarios may be very close to the offset obtained by the message timestamp. Does this sound reasonable? Good point that users on v1 format can get benefit with timestamp based approach. On the other hand it seems like a short term benefit for users who have not migrated. I am just not sure whether it is more important than designing a better API. Also, for both "latest" and "earliest" reset policy, do you think it would make sense to also use the KIP-101 based approach to truncate offset for the "unknown truncation offset" scenario? Thanks, Dong
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
The other thing I forgot to mention is that resetting the offset using the leader epoch is only available with the latest message format. By supporting reset by timestamp, users on the v1 format can still get some benefit from this KIP. -Jason On Tue, Jun 26, 2018 at 11:47 AM, Jason Gustafson wrote: > Hey Dong, > > Thanks for the comments. > > - The KIP says that, with auto.offset.reset="closest", timestamp is used to >> find offset if truncation offset is unknown. It seems that if consumer >> knows the timestamp of the last message, then the consumer should also >> know >> the (offset, leaderEpoch) of the last message which can then be used for >> find the truncation offset. Can you explain why truncation offset is >> unknown in this case? > > > The intent of the new reset policy is to automatically locate the closest > offset within the limits of Kafka log semantics. Unlike replicas, > consumers do not know the full history of leader epochs that have been > previously read. In some scenarios, they may not be able to precisely find > the offset where the log diverged after a sequence of unclean leader > elections (see KIP-279 for more detail). It seemed unfortunate in these > cases to have to resort to the coarse-grained resetting using either the > earliest or latest offset. Using the timestamp, we can find a more accurate > reset point and minimize the amount of loss or duplication. > > - How does consumer differentiates between "Offset out of rnage (too low)" >> and "Offset out of range (unknown truncation offset)", i.e. the two >> columns >> in table provided in the KIP? > > > We know when an offset is too low because we have the start offset of the > log from the fetch response. Following this KIP, that should really be the > only time we get an OutOfRange error (other than buggy application code). > The other two cases are distinguished based on whether we are able to find > the right offset of divergence. > > - It is probably a typo. Maybe fix "This is not the last The" in the >> Proposed Section. > > > Thanks. Magnus noticed this too and I fixed it earlier this morning. Good > to know who's actually reading the proposal! > > -Jason > > > > On Tue, Jun 26, 2018 at 11:09 AM, Dong Lin wrote: > >> Hey Jason, >> >> Thanks for the KIP! It is pretty useful. >> >> At high level the new set of reset policies may be a bit complicated and >> confusing to users. I am wondering whether we can simplify it. A few >> questions below: >> >> - The KIP says that, with auto.offset.reset="closest", timestamp is used >> to >> find offset if truncation offset is unknown. It seems that if consumer >> knows the timestamp of the last message, then the consumer should also >> know >> the (offset, leaderEpoch) of the last message which can then be used for >> find the truncation offset. Can you explain why truncation offset is >> unknown in this case? >> >> - How does consumer differentiates between "Offset out of rnage (too low)" >> and "Offset out of range (unknown truncation offset)", i.e. the two >> columns >> in table provided in the KIP? >> >> - It is probably a typo. Maybe fix "This is not the last The" in the >> Proposed Section. >> >> >> Thanks, >> Dong >> >> On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson >> wrote: >> >> > Hey All, >> > >> > I wrote up a KIP to handle one more edge case in the replication >> protocol >> > and to support better handling of truncation in the consumer when >> unclean >> > leader election is enabled. Let me know what you think. >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A >> > +Allow+fetchers+to+detect+and+handle+log+truncation >> > >> > Thanks to Anna Povzner and Dong Lin for initial feedback. >> > >> > Thanks, >> > Jason >> > >> > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Dong, Thanks for the comments. - The KIP says that, with auto.offset.reset="closest", timestamp is used to > find offset if truncation offset is unknown. It seems that if consumer > knows the timestamp of the last message, then the consumer should also know > the (offset, leaderEpoch) of the last message which can then be used for > find the truncation offset. Can you explain why truncation offset is > unknown in this case? The intent of the new reset policy is to automatically locate the closest offset within the limits of Kafka log semantics. Unlike replicas, consumers do not know the full history of leader epochs that have been previously read. In some scenarios, they may not be able to precisely find the offset where the log diverged after a sequence of unclean leader elections (see KIP-279 for more detail). It seemed unfortunate in these cases to have to resort to the coarse-grained resetting using either the earliest or latest offset. Using the timestamp, we can find a more accurate reset point and minimize the amount of loss or duplication. - How does consumer differentiates between "Offset out of rnage (too low)" > and "Offset out of range (unknown truncation offset)", i.e. the two columns > in table provided in the KIP? We know when an offset is too low because we have the start offset of the log from the fetch response. Following this KIP, that should really be the only time we get an OutOfRange error (other than buggy application code). The other two cases are distinguished based on whether we are able to find the right offset of divergence. - It is probably a typo. Maybe fix "This is not the last The" in the > Proposed Section. Thanks. Magnus noticed this too and I fixed it earlier this morning. Good to know who's actually reading the proposal! -Jason On Tue, Jun 26, 2018 at 11:09 AM, Dong Lin wrote: > Hey Jason, > > Thanks for the KIP! It is pretty useful. > > At high level the new set of reset policies may be a bit complicated and > confusing to users. I am wondering whether we can simplify it. A few > questions below: > > - The KIP says that, with auto.offset.reset="closest", timestamp is used to > find offset if truncation offset is unknown. It seems that if consumer > knows the timestamp of the last message, then the consumer should also know > the (offset, leaderEpoch) of the last message which can then be used for > find the truncation offset. Can you explain why truncation offset is > unknown in this case? > > - How does consumer differentiates between "Offset out of rnage (too low)" > and "Offset out of range (unknown truncation offset)", i.e. the two columns > in table provided in the KIP? > > - It is probably a typo. Maybe fix "This is not the last The" in the > Proposed Section. > > > Thanks, > Dong > > On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson > wrote: > > > Hey All, > > > > I wrote up a KIP to handle one more edge case in the replication protocol > > and to support better handling of truncation in the consumer when unclean > > leader election is enabled. Let me know what you think. > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A > > +Allow+fetchers+to+detect+and+handle+log+truncation > > > > Thanks to Anna Povzner and Dong Lin for initial feedback. > > > > Thanks, > > Jason > > >
Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey Jason, Thanks for the KIP! It is pretty useful. At high level the new set of reset policies may be a bit complicated and confusing to users. I am wondering whether we can simplify it. A few questions below: - The KIP says that, with auto.offset.reset="closest", timestamp is used to find offset if truncation offset is unknown. It seems that if consumer knows the timestamp of the last message, then the consumer should also know the (offset, leaderEpoch) of the last message which can then be used for find the truncation offset. Can you explain why truncation offset is unknown in this case? - How does consumer differentiates between "Offset out of rnage (too low)" and "Offset out of range (unknown truncation offset)", i.e. the two columns in table provided in the KIP? - It is probably a typo. Maybe fix "This is not the last The" in the Proposed Section. Thanks, Dong On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson wrote: > Hey All, > > I wrote up a KIP to handle one more edge case in the replication protocol > and to support better handling of truncation in the consumer when unclean > leader election is enabled. Let me know what you think. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A > +Allow+fetchers+to+detect+and+handle+log+truncation > > Thanks to Anna Povzner and Dong Lin for initial feedback. > > Thanks, > Jason >
[DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation
Hey All, I wrote up a KIP to handle one more edge case in the replication protocol and to support better handling of truncation in the consumer when unclean leader election is enabled. Let me know what you think. https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation Thanks to Anna Povzner and Dong Lin for initial feedback. Thanks, Jason