Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Jason Gustafson
Hey Dong,

I see what you mean. This would have been clearer if the committed offset
was the offset of the last consumed record. I don't feel too strongly about
it either. Perhaps we can use the more concise name and just rely on the
documentation to explain its usage. It should be rare that users have think
about this anyway.

-Jason


On Tue, Aug 7, 2018 at 1:35 PM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the reply. Regarding 3), I am thinking that both "Offset" and
> "LastLeaderEpoch" in the OffsetCommitRequest are associated with the last
> consumed messages. Value of "Offset" is not necessarily the offset of the
> next message due to log compaction. Since we are naming "Offset" as e.g.
> "NextOffset", it may be simpler to use "LeaderEpoch".
>
> I am not strong on this. If we decide to name the new field as
> "LastLeaderEpoch", would it be more consistent to also name the new field
> as "LastLeaderEpoch" in the Offset Commit Value Schema?
>
>
> Thanks,
> Dong
>
>
> On Tue, Aug 7, 2018 at 1:23 PM, Jason Gustafson 
> wrote:
>
> > Hi Dong,
> >
> > Thanks for the comments.
> >
> > 1) Yes, makes sense.
> >
> > 2) This is an interesting point. The suggestion made more sense in the
> > initial version of the KIP, but I think you are right that we should use
> > the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch
> > APIs. Just like a following replica, we need to protect the initial fetch
> > to a leader. If the initial fetch position was obtained from a broker
> with
> > the same epoch, then we can begin fetching. Otherwise, we need the
> > OffsetForLeaderEpoch dance. And it is better to avoid using offsets
> > obtained from stale leaders in the first place. I'll update the KIP.
> >
> > 3) The committed offset is actually the next offset that a consumer will
> > read, right? The leader epoch on the other hand is the last one that was
> > consumed.
> >
> > 4) Yes, that is right. And it is a good point that the epoch we use for
> log
> > reconciliation must be less than or equal to the current leader epoch. I
> > will mention this.
> >
> > Thanks,
> > Jason
> >
> > On Tue, Aug 7, 2018 at 12:43 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Jun,
> > >
> > > 57. It's a fair point. I could go either way, but I'm slightly inclined
> > to
> > > just document the new API for now. We'll still support seeking to an
> > offset
> > > with corresponding epoch information, so deprecating the old seek()
> seems
> > > like overkill.
> > >
> > > 60. The phrasing was a little confusing. Does this sound better?
> > >
> > > "Log truncation is detected if there exists a leader epoch which is
> > > larger than this epoch and begins at an offset earlier than the
> committed
> > > offset."
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin  wrote:
> > >
> > >> Hey Jason,
> > >>
> > >> Thanks for the update. I have some comments below:
> > >>
> > >> 1) Since FencedLeaderEpochException indicates that the metadata in the
> > >> client is outdated, should it extend InvalidMetadataException?
> > >>
> > >> 2) It is mentioned that "To fix the problem with KIP-232, we will add
> > the
> > >> leader epoch the ListOffsets response. The consumer will use this in
> its
> > >> first fetch request after resetting offsets". If consumer sends
> > >> ListOffsetRequest to the broker who is no longer the leader, and the
> > >> broker
> > >> still think it is the leader, then the broker may return
> > >> ListOffsetResponse
> > >> whose leaderEpoch is smaller than the leaderEpoch in the metadata of
> the
> > >> consumer. In this case consumer probably should not just send
> > FetchRequest
> > >> with the leaderEpoch of the ListOffsetResponse, right? I am wondering
> > >> whether we should also include CurrentLeaderEpoch in the
> > >> ListOffsetRequest.
> > >>
> > >> 3) Currently the new field added in the OffsetCommitRequest/
> > >> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that
> > we
> > >> are not naming the existing field "Offset" as "LastOffset", would it
> be
> > >> more consistent to just name the new field as LeaderEpoch? Same for
> the
> > >> new
> > >> API in the class OffsetAndMetadata.
> > >>
> > >> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch
> in
> > >> the
> > >> FetchRequest comes from? I suppose this value can be updated by the
> > >> MetadataResponse, right? If so, maybe we should also clarify that
> client
> > >> should reject MetadataResponse if the leaderEpoch in the metadata
> > response
> > >> is smaller than what the client also knows from e.g.
> > >> seek(...), OffsetFetchResponse?
> > >>
> > >>
> > >> Thanks,
> > >> Dong
> > >>
> > >>
> > >> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao  wrote:
> > >>
> > >> > Hi, Jason,
> > >> >
> > >> > Thanks for the reply. They all make sense. Just a couple of more
> minor
> > >> > comments.
> > >> >
> > >> > 57. I was thinking that if will be useful to encourage people to use
> > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Dong Lin
Hey Jason,

Thanks for the reply. Regarding 3), I am thinking that both "Offset" and
"LastLeaderEpoch" in the OffsetCommitRequest are associated with the last
consumed messages. Value of "Offset" is not necessarily the offset of the
next message due to log compaction. Since we are naming "Offset" as e.g.
"NextOffset", it may be simpler to use "LeaderEpoch".

I am not strong on this. If we decide to name the new field as
"LastLeaderEpoch", would it be more consistent to also name the new field
as "LastLeaderEpoch" in the Offset Commit Value Schema?


Thanks,
Dong


On Tue, Aug 7, 2018 at 1:23 PM, Jason Gustafson  wrote:

> Hi Dong,
>
> Thanks for the comments.
>
> 1) Yes, makes sense.
>
> 2) This is an interesting point. The suggestion made more sense in the
> initial version of the KIP, but I think you are right that we should use
> the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch
> APIs. Just like a following replica, we need to protect the initial fetch
> to a leader. If the initial fetch position was obtained from a broker with
> the same epoch, then we can begin fetching. Otherwise, we need the
> OffsetForLeaderEpoch dance. And it is better to avoid using offsets
> obtained from stale leaders in the first place. I'll update the KIP.
>
> 3) The committed offset is actually the next offset that a consumer will
> read, right? The leader epoch on the other hand is the last one that was
> consumed.
>
> 4) Yes, that is right. And it is a good point that the epoch we use for log
> reconciliation must be less than or equal to the current leader epoch. I
> will mention this.
>
> Thanks,
> Jason
>
> On Tue, Aug 7, 2018 at 12:43 PM, Jason Gustafson 
> wrote:
>
> > Hey Jun,
> >
> > 57. It's a fair point. I could go either way, but I'm slightly inclined
> to
> > just document the new API for now. We'll still support seeking to an
> offset
> > with corresponding epoch information, so deprecating the old seek() seems
> > like overkill.
> >
> > 60. The phrasing was a little confusing. Does this sound better?
> >
> > "Log truncation is detected if there exists a leader epoch which is
> > larger than this epoch and begins at an offset earlier than the committed
> > offset."
> >
> > Thanks,
> > Jason
> >
> >
> > On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin  wrote:
> >
> >> Hey Jason,
> >>
> >> Thanks for the update. I have some comments below:
> >>
> >> 1) Since FencedLeaderEpochException indicates that the metadata in the
> >> client is outdated, should it extend InvalidMetadataException?
> >>
> >> 2) It is mentioned that "To fix the problem with KIP-232, we will add
> the
> >> leader epoch the ListOffsets response. The consumer will use this in its
> >> first fetch request after resetting offsets". If consumer sends
> >> ListOffsetRequest to the broker who is no longer the leader, and the
> >> broker
> >> still think it is the leader, then the broker may return
> >> ListOffsetResponse
> >> whose leaderEpoch is smaller than the leaderEpoch in the metadata of the
> >> consumer. In this case consumer probably should not just send
> FetchRequest
> >> with the leaderEpoch of the ListOffsetResponse, right? I am wondering
> >> whether we should also include CurrentLeaderEpoch in the
> >> ListOffsetRequest.
> >>
> >> 3) Currently the new field added in the OffsetCommitRequest/
> >> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that
> we
> >> are not naming the existing field "Offset" as "LastOffset", would it be
> >> more consistent to just name the new field as LeaderEpoch? Same for the
> >> new
> >> API in the class OffsetAndMetadata.
> >>
> >> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in
> >> the
> >> FetchRequest comes from? I suppose this value can be updated by the
> >> MetadataResponse, right? If so, maybe we should also clarify that client
> >> should reject MetadataResponse if the leaderEpoch in the metadata
> response
> >> is smaller than what the client also knows from e.g.
> >> seek(...), OffsetFetchResponse?
> >>
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao  wrote:
> >>
> >> > Hi, Jason,
> >> >
> >> > Thanks for the reply. They all make sense. Just a couple of more minor
> >> > comments.
> >> >
> >> > 57. I was thinking that if will be useful to encourage people to use
> the
> >> > new seek() api to get better semantics. Deprecating the old seek api
> is
> >> one
> >> > way. I guess we could also just document it for now.
> >> >
> >> > 60. "Log truncation is detected if the first offset of the epoch for
> the
> >> > committed offset is larger than this epoch and begins at an earlier
> >> > offset." It seems that we should add "that" before "is larger than"?
> >> >
> >> > Thanks,
> >> >
> >> > Jun
> >> >
> >> >
> >> > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson 
> >> > wrote:
> >> >
> >> > > Hi Jun,
> >> > >
> >> > > I spent a little more time looking at the usage in WorkerSinkTask. I
> >> > think
> >> > > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Jason Gustafson
Hi Dong,

Thanks for the comments.

1) Yes, makes sense.

2) This is an interesting point. The suggestion made more sense in the
initial version of the KIP, but I think you are right that we should use
the same fencing semantics we use for the Fetch and OffsetForLeaderEpoch
APIs. Just like a following replica, we need to protect the initial fetch
to a leader. If the initial fetch position was obtained from a broker with
the same epoch, then we can begin fetching. Otherwise, we need the
OffsetForLeaderEpoch dance. And it is better to avoid using offsets
obtained from stale leaders in the first place. I'll update the KIP.

3) The committed offset is actually the next offset that a consumer will
read, right? The leader epoch on the other hand is the last one that was
consumed.

4) Yes, that is right. And it is a good point that the epoch we use for log
reconciliation must be less than or equal to the current leader epoch. I
will mention this.

Thanks,
Jason

On Tue, Aug 7, 2018 at 12:43 PM, Jason Gustafson  wrote:

> Hey Jun,
>
> 57. It's a fair point. I could go either way, but I'm slightly inclined to
> just document the new API for now. We'll still support seeking to an offset
> with corresponding epoch information, so deprecating the old seek() seems
> like overkill.
>
> 60. The phrasing was a little confusing. Does this sound better?
>
> "Log truncation is detected if there exists a leader epoch which is
> larger than this epoch and begins at an offset earlier than the committed
> offset."
>
> Thanks,
> Jason
>
>
> On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin  wrote:
>
>> Hey Jason,
>>
>> Thanks for the update. I have some comments below:
>>
>> 1) Since FencedLeaderEpochException indicates that the metadata in the
>> client is outdated, should it extend InvalidMetadataException?
>>
>> 2) It is mentioned that "To fix the problem with KIP-232, we will add the
>> leader epoch the ListOffsets response. The consumer will use this in its
>> first fetch request after resetting offsets". If consumer sends
>> ListOffsetRequest to the broker who is no longer the leader, and the
>> broker
>> still think it is the leader, then the broker may return
>> ListOffsetResponse
>> whose leaderEpoch is smaller than the leaderEpoch in the metadata of the
>> consumer. In this case consumer probably should not just send FetchRequest
>> with the leaderEpoch of the ListOffsetResponse, right? I am wondering
>> whether we should also include CurrentLeaderEpoch in the
>> ListOffsetRequest.
>>
>> 3) Currently the new field added in the OffsetCommitRequest/
>> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we
>> are not naming the existing field "Offset" as "LastOffset", would it be
>> more consistent to just name the new field as LeaderEpoch? Same for the
>> new
>> API in the class OffsetAndMetadata.
>>
>> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in
>> the
>> FetchRequest comes from? I suppose this value can be updated by the
>> MetadataResponse, right? If so, maybe we should also clarify that client
>> should reject MetadataResponse if the leaderEpoch in the metadata response
>> is smaller than what the client also knows from e.g.
>> seek(...), OffsetFetchResponse?
>>
>>
>> Thanks,
>> Dong
>>
>>
>> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao  wrote:
>>
>> > Hi, Jason,
>> >
>> > Thanks for the reply. They all make sense. Just a couple of more minor
>> > comments.
>> >
>> > 57. I was thinking that if will be useful to encourage people to use the
>> > new seek() api to get better semantics. Deprecating the old seek api is
>> one
>> > way. I guess we could also just document it for now.
>> >
>> > 60. "Log truncation is detected if the first offset of the epoch for the
>> > committed offset is larger than this epoch and begins at an earlier
>> > offset." It seems that we should add "that" before "is larger than"?
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> >
>> > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson 
>> > wrote:
>> >
>> > > Hi Jun,
>> > >
>> > > I spent a little more time looking at the usage in WorkerSinkTask. I
>> > think
>> > > actually the initialization of the positions in the assignment
>> callback
>> > is
>> > > not strictly necessary. We keep a map of the current consumed offsets
>> > which
>> > > is updated as we consume the data. As far as I can tell, we could
>> either
>> > > skip the initialization and wait until the first fetched records come
>> in
>> > or
>> > > we could use the committed() API to initialize positions. I think the
>> > root
>> > > of it is the argument Anna made previously. The leader epoch lets us
>> > track
>> > > the history of records that we have consumed. It is only useful when
>> we
>> > > want to tell whether records we have consumed were lost. So getting
>> the
>> > > leader epoch of an arbitrary position that was seeked doesn't really
>> make
>> > > sense. The dependence on the consumed records is most explicit if we
>> only
>> > > expose the 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Jason Gustafson
Hey Jun,

57. It's a fair point. I could go either way, but I'm slightly inclined to
just document the new API for now. We'll still support seeking to an offset
with corresponding epoch information, so deprecating the old seek() seems
like overkill.

60. The phrasing was a little confusing. Does this sound better?

"Log truncation is detected if there exists a leader epoch which is larger than
this epoch and begins at an offset earlier than the committed offset."

Thanks,
Jason


On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the update. I have some comments below:
>
> 1) Since FencedLeaderEpochException indicates that the metadata in the
> client is outdated, should it extend InvalidMetadataException?
>
> 2) It is mentioned that "To fix the problem with KIP-232, we will add the
> leader epoch the ListOffsets response. The consumer will use this in its
> first fetch request after resetting offsets". If consumer sends
> ListOffsetRequest to the broker who is no longer the leader, and the broker
> still think it is the leader, then the broker may return ListOffsetResponse
> whose leaderEpoch is smaller than the leaderEpoch in the metadata of the
> consumer. In this case consumer probably should not just send FetchRequest
> with the leaderEpoch of the ListOffsetResponse, right? I am wondering
> whether we should also include CurrentLeaderEpoch in the ListOffsetRequest.
>
> 3) Currently the new field added in the OffsetCommitRequest/
> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we
> are not naming the existing field "Offset" as "LastOffset", would it be
> more consistent to just name the new field as LeaderEpoch? Same for the new
> API in the class OffsetAndMetadata.
>
> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in the
> FetchRequest comes from? I suppose this value can be updated by the
> MetadataResponse, right? If so, maybe we should also clarify that client
> should reject MetadataResponse if the leaderEpoch in the metadata response
> is smaller than what the client also knows from e.g.
> seek(...), OffsetFetchResponse?
>
>
> Thanks,
> Dong
>
>
> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao  wrote:
>
> > Hi, Jason,
> >
> > Thanks for the reply. They all make sense. Just a couple of more minor
> > comments.
> >
> > 57. I was thinking that if will be useful to encourage people to use the
> > new seek() api to get better semantics. Deprecating the old seek api is
> one
> > way. I guess we could also just document it for now.
> >
> > 60. "Log truncation is detected if the first offset of the epoch for the
> > committed offset is larger than this epoch and begins at an earlier
> > offset." It seems that we should add "that" before "is larger than"?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson 
> > wrote:
> >
> > > Hi Jun,
> > >
> > > I spent a little more time looking at the usage in WorkerSinkTask. I
> > think
> > > actually the initialization of the positions in the assignment callback
> > is
> > > not strictly necessary. We keep a map of the current consumed offsets
> > which
> > > is updated as we consume the data. As far as I can tell, we could
> either
> > > skip the initialization and wait until the first fetched records come
> in
> > or
> > > we could use the committed() API to initialize positions. I think the
> > root
> > > of it is the argument Anna made previously. The leader epoch lets us
> > track
> > > the history of records that we have consumed. It is only useful when we
> > > want to tell whether records we have consumed were lost. So getting the
> > > leader epoch of an arbitrary position that was seeked doesn't really
> make
> > > sense. The dependence on the consumed records is most explicit if we
> only
> > > expose the leader epoch inside the fetched records. We might consider
> > > adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm
> > > inclined to leave that as potential future work.
> > >
> > > A couple additional notes:
> > >
> > > 1. I've renamed OffsetAndMetadata.leaderEpoch to
> > > OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know
> > what
> > > the leader epoch of the committed offset should be, so this just
> > clarifies
> > > the expected usage.
> > >
> > > 2. I decided to add a helper to ConsumerRecords to get the next
> offsets.
> > We
> > > would use this in WorkerSinkTask and external storage use cases to
> > simplify
> > > the commit logic. If we are consuming batch by batch, then we don't
> need
> > > the message-level bookkeeping.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks for the review. Responses below:
> > > >
> > > > 50. Yes, that is right. I clarified this in the KIP.
> > > >
> > > > 51. Yes, updated the KIP to mention.
> > > >
> > > > 52. Yeah, this was a reference to a previous iteration. I've 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-07 Thread Dong Lin
Hey Jason,

Thanks for the update. I have some comments below:

1) Since FencedLeaderEpochException indicates that the metadata in the
client is outdated, should it extend InvalidMetadataException?

2) It is mentioned that "To fix the problem with KIP-232, we will add the
leader epoch the ListOffsets response. The consumer will use this in its
first fetch request after resetting offsets". If consumer sends
ListOffsetRequest to the broker who is no longer the leader, and the broker
still think it is the leader, then the broker may return ListOffsetResponse
whose leaderEpoch is smaller than the leaderEpoch in the metadata of the
consumer. In this case consumer probably should not just send FetchRequest
with the leaderEpoch of the ListOffsetResponse, right? I am wondering
whether we should also include CurrentLeaderEpoch in the ListOffsetRequest.

3) Currently the new field added in the OffsetCommitRequest/
OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we
are not naming the existing field "Offset" as "LastOffset", would it be
more consistent to just name the new field as LeaderEpoch? Same for the new
API in the class OffsetAndMetadata.

4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in the
FetchRequest comes from? I suppose this value can be updated by the
MetadataResponse, right? If so, maybe we should also clarify that client
should reject MetadataResponse if the leaderEpoch in the metadata response
is smaller than what the client also knows from e.g.
seek(...), OffsetFetchResponse?


Thanks,
Dong


On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao  wrote:

> Hi, Jason,
>
> Thanks for the reply. They all make sense. Just a couple of more minor
> comments.
>
> 57. I was thinking that if will be useful to encourage people to use the
> new seek() api to get better semantics. Deprecating the old seek api is one
> way. I guess we could also just document it for now.
>
> 60. "Log truncation is detected if the first offset of the epoch for the
> committed offset is larger than this epoch and begins at an earlier
> offset." It seems that we should add "that" before "is larger than"?
>
> Thanks,
>
> Jun
>
>
> On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson 
> wrote:
>
> > Hi Jun,
> >
> > I spent a little more time looking at the usage in WorkerSinkTask. I
> think
> > actually the initialization of the positions in the assignment callback
> is
> > not strictly necessary. We keep a map of the current consumed offsets
> which
> > is updated as we consume the data. As far as I can tell, we could either
> > skip the initialization and wait until the first fetched records come in
> or
> > we could use the committed() API to initialize positions. I think the
> root
> > of it is the argument Anna made previously. The leader epoch lets us
> track
> > the history of records that we have consumed. It is only useful when we
> > want to tell whether records we have consumed were lost. So getting the
> > leader epoch of an arbitrary position that was seeked doesn't really make
> > sense. The dependence on the consumed records is most explicit if we only
> > expose the leader epoch inside the fetched records. We might consider
> > adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm
> > inclined to leave that as potential future work.
> >
> > A couple additional notes:
> >
> > 1. I've renamed OffsetAndMetadata.leaderEpoch to
> > OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know
> what
> > the leader epoch of the committed offset should be, so this just
> clarifies
> > the expected usage.
> >
> > 2. I decided to add a helper to ConsumerRecords to get the next offsets.
> We
> > would use this in WorkerSinkTask and external storage use cases to
> simplify
> > the commit logic. If we are consuming batch by batch, then we don't need
> > the message-level bookkeeping.
> >
> > Thanks,
> > Jason
> >
> > On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for the review. Responses below:
> > >
> > > 50. Yes, that is right. I clarified this in the KIP.
> > >
> > > 51. Yes, updated the KIP to mention.
> > >
> > > 52. Yeah, this was a reference to a previous iteration. I've fixed it.
> > >
> > > 53. I changed the API to use an `Optional` for the leader
> epoch
> > > and added a note about the default value. Does that seem reasonable?
> > >
> > > 54. We discussed this above, but could not find a great option. The
> > > options are to add a new API (e.g. positionAndEpoch) or to rely on the
> > user
> > > to get the epoch from the fetched records. We were leaning toward the
> > > latter, but I admit it was not fully satisfying. In this case, Connect
> > > would need to track the last consumed offsets manually instead of
> relying
> > > on the consumer. We also considered adding a convenience method to
> > > ConsumerRecords to get the offset to commit for all fetched partitions.
> > > This makes the additional bookkeeping 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-06 Thread Jun Rao
Hi, Jason,

Thanks for the reply. They all make sense. Just a couple of more minor
comments.

57. I was thinking that if will be useful to encourage people to use the
new seek() api to get better semantics. Deprecating the old seek api is one
way. I guess we could also just document it for now.

60. "Log truncation is detected if the first offset of the epoch for the
committed offset is larger than this epoch and begins at an earlier
offset." It seems that we should add "that" before "is larger than"?

Thanks,

Jun


On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson  wrote:

> Hi Jun,
>
> I spent a little more time looking at the usage in WorkerSinkTask. I think
> actually the initialization of the positions in the assignment callback is
> not strictly necessary. We keep a map of the current consumed offsets which
> is updated as we consume the data. As far as I can tell, we could either
> skip the initialization and wait until the first fetched records come in or
> we could use the committed() API to initialize positions. I think the root
> of it is the argument Anna made previously. The leader epoch lets us track
> the history of records that we have consumed. It is only useful when we
> want to tell whether records we have consumed were lost. So getting the
> leader epoch of an arbitrary position that was seeked doesn't really make
> sense. The dependence on the consumed records is most explicit if we only
> expose the leader epoch inside the fetched records. We might consider
> adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm
> inclined to leave that as potential future work.
>
> A couple additional notes:
>
> 1. I've renamed OffsetAndMetadata.leaderEpoch to
> OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know what
> the leader epoch of the committed offset should be, so this just clarifies
> the expected usage.
>
> 2. I decided to add a helper to ConsumerRecords to get the next offsets. We
> would use this in WorkerSinkTask and external storage use cases to simplify
> the commit logic. If we are consuming batch by batch, then we don't need
> the message-level bookkeeping.
>
> Thanks,
> Jason
>
> On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson 
> wrote:
>
> > Hey Jun,
> >
> > Thanks for the review. Responses below:
> >
> > 50. Yes, that is right. I clarified this in the KIP.
> >
> > 51. Yes, updated the KIP to mention.
> >
> > 52. Yeah, this was a reference to a previous iteration. I've fixed it.
> >
> > 53. I changed the API to use an `Optional` for the leader epoch
> > and added a note about the default value. Does that seem reasonable?
> >
> > 54. We discussed this above, but could not find a great option. The
> > options are to add a new API (e.g. positionAndEpoch) or to rely on the
> user
> > to get the epoch from the fetched records. We were leaning toward the
> > latter, but I admit it was not fully satisfying. In this case, Connect
> > would need to track the last consumed offsets manually instead of relying
> > on the consumer. We also considered adding a convenience method to
> > ConsumerRecords to get the offset to commit for all fetched partitions.
> > This makes the additional bookkeeping pretty minimal. What do you think?
> >
> > 55. I clarified in the KIP. I was mainly thinking of situations where a
> > previously valid offset becomes out of range.
> >
> > 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is
> > and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch
> > APIs. I think Dong suggested this previously as well.
> >
> > 57. We could, but I'm not sure there's a strong reason to do so. I was
> > thinking we would leave it around for convenience, but let me know if you
> > think we should do otherwise.
> >
> >
> > Thanks,
> > Jason
> >
> >
> > On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao  wrote:
> >
> >> Hi, Jason,
> >>
> >> Thanks for the updated KIP. Well thought-through. Just a few minor
> >> comments
> >> below.
> >>
> >> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I
> guess
> >> under the cover, it will make OffsetsForLeaderEpoch request to determine
> >> if
> >> the seeked offset is still valid before fetching? If so, it will be
> useful
> >> document this in the wiki.
> >>
> >> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I
> >> guess the consumer will also make OffsetsForLeaderEpoch request to
> >> determine if the last consumed offset is still valid before fetching? If
> >> so, it will be useful document this in the wiki.
> >>
> >> 52. "If the consumer seeks to the middle of the log, for example, then
> we
> >> will use the sentinel value -1 and the leader will skip the epoch
> >> validation. " Is this true? If the consumer seeks using
> >> seek(TopicPartition
> >> partition, OffsetAndMetadata offset) and the seeked offset is valid, the
> >> consumer can/should use the leaderEpoch in the cached metadata for
> >> fetching?
> >>
> >> 53. OffsetAndMetadata. 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-06 Thread Jason Gustafson
Hi Jun,

I spent a little more time looking at the usage in WorkerSinkTask. I think
actually the initialization of the positions in the assignment callback is
not strictly necessary. We keep a map of the current consumed offsets which
is updated as we consume the data. As far as I can tell, we could either
skip the initialization and wait until the first fetched records come in or
we could use the committed() API to initialize positions. I think the root
of it is the argument Anna made previously. The leader epoch lets us track
the history of records that we have consumed. It is only useful when we
want to tell whether records we have consumed were lost. So getting the
leader epoch of an arbitrary position that was seeked doesn't really make
sense. The dependence on the consumed records is most explicit if we only
expose the leader epoch inside the fetched records. We might consider
adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm
inclined to leave that as potential future work.

A couple additional notes:

1. I've renamed OffsetAndMetadata.leaderEpoch to
OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know what
the leader epoch of the committed offset should be, so this just clarifies
the expected usage.

2. I decided to add a helper to ConsumerRecords to get the next offsets. We
would use this in WorkerSinkTask and external storage use cases to simplify
the commit logic. If we are consuming batch by batch, then we don't need
the message-level bookkeeping.

Thanks,
Jason

On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson  wrote:

> Hey Jun,
>
> Thanks for the review. Responses below:
>
> 50. Yes, that is right. I clarified this in the KIP.
>
> 51. Yes, updated the KIP to mention.
>
> 52. Yeah, this was a reference to a previous iteration. I've fixed it.
>
> 53. I changed the API to use an `Optional` for the leader epoch
> and added a note about the default value. Does that seem reasonable?
>
> 54. We discussed this above, but could not find a great option. The
> options are to add a new API (e.g. positionAndEpoch) or to rely on the user
> to get the epoch from the fetched records. We were leaning toward the
> latter, but I admit it was not fully satisfying. In this case, Connect
> would need to track the last consumed offsets manually instead of relying
> on the consumer. We also considered adding a convenience method to
> ConsumerRecords to get the offset to commit for all fetched partitions.
> This makes the additional bookkeeping pretty minimal. What do you think?
>
> 55. I clarified in the KIP. I was mainly thinking of situations where a
> previously valid offset becomes out of range.
>
> 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is
> and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch
> APIs. I think Dong suggested this previously as well.
>
> 57. We could, but I'm not sure there's a strong reason to do so. I was
> thinking we would leave it around for convenience, but let me know if you
> think we should do otherwise.
>
>
> Thanks,
> Jason
>
>
> On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao  wrote:
>
>> Hi, Jason,
>>
>> Thanks for the updated KIP. Well thought-through. Just a few minor
>> comments
>> below.
>>
>> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess
>> under the cover, it will make OffsetsForLeaderEpoch request to determine
>> if
>> the seeked offset is still valid before fetching? If so, it will be useful
>> document this in the wiki.
>>
>> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I
>> guess the consumer will also make OffsetsForLeaderEpoch request to
>> determine if the last consumed offset is still valid before fetching? If
>> so, it will be useful document this in the wiki.
>>
>> 52. "If the consumer seeks to the middle of the log, for example, then we
>> will use the sentinel value -1 and the leader will skip the epoch
>> validation. " Is this true? If the consumer seeks using
>> seek(TopicPartition
>> partition, OffsetAndMetadata offset) and the seeked offset is valid, the
>> consumer can/should use the leaderEpoch in the cached metadata for
>> fetching?
>>
>> 53. OffsetAndMetadata. For backward compatibility, we need to support
>> constructing OffsetAndMetadata without providing leaderEpoch. Could we
>> define the default value of leaderEpoch if not provided and the semantics
>> of that (e.g., skipping the epoch validation)?
>>
>> 54. I saw the following code in WorkerSinkTask in Connect. It saves the
>> offset obtained through position(), which can be committed latter. Since
>> position() doesn't return the leaderEpoch, this can lead to committed
>> offset without leaderEpoch. Not sure how common this usage is, but what's
>> the recommendation for such users?
>>
>> private class HandleRebalance implements ConsumerRebalanceListener {
>> @Override
>> public void onPartitionsAssigned(Collection
>> partitions) {
>> log.debug("{} 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-06 Thread Jason Gustafson
Hey Jun,

Thanks for the review. Responses below:

50. Yes, that is right. I clarified this in the KIP.

51. Yes, updated the KIP to mention.

52. Yeah, this was a reference to a previous iteration. I've fixed it.

53. I changed the API to use an `Optional` for the leader epoch
and added a note about the default value. Does that seem reasonable?

54. We discussed this above, but could not find a great option. The options
are to add a new API (e.g. positionAndEpoch) or to rely on the user to get
the epoch from the fetched records. We were leaning toward the latter, but
I admit it was not fully satisfying. In this case, Connect would need to
track the last consumed offsets manually instead of relying on the
consumer. We also considered adding a convenience method to ConsumerRecords
to get the offset to commit for all fetched partitions. This makes the
additional bookkeeping pretty minimal. What do you think?

55. I clarified in the KIP. I was mainly thinking of situations where a
previously valid offset becomes out of range.

56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is and
use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch APIs. I
think Dong suggested this previously as well.

57. We could, but I'm not sure there's a strong reason to do so. I was
thinking we would leave it around for convenience, but let me know if you
think we should do otherwise.


Thanks,
Jason


On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao  wrote:

> Hi, Jason,
>
> Thanks for the updated KIP. Well thought-through. Just a few minor comments
> below.
>
> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess
> under the cover, it will make OffsetsForLeaderEpoch request to determine if
> the seeked offset is still valid before fetching? If so, it will be useful
> document this in the wiki.
>
> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I
> guess the consumer will also make OffsetsForLeaderEpoch request to
> determine if the last consumed offset is still valid before fetching? If
> so, it will be useful document this in the wiki.
>
> 52. "If the consumer seeks to the middle of the log, for example, then we
> will use the sentinel value -1 and the leader will skip the epoch
> validation. " Is this true? If the consumer seeks using seek(TopicPartition
> partition, OffsetAndMetadata offset) and the seeked offset is valid, the
> consumer can/should use the leaderEpoch in the cached metadata for
> fetching?
>
> 53. OffsetAndMetadata. For backward compatibility, we need to support
> constructing OffsetAndMetadata without providing leaderEpoch. Could we
> define the default value of leaderEpoch if not provided and the semantics
> of that (e.g., skipping the epoch validation)?
>
> 54. I saw the following code in WorkerSinkTask in Connect. It saves the
> offset obtained through position(), which can be committed latter. Since
> position() doesn't return the leaderEpoch, this can lead to committed
> offset without leaderEpoch. Not sure how common this usage is, but what's
> the recommendation for such users?
>
> private class HandleRebalance implements ConsumerRebalanceListener {
> @Override
> public void onPartitionsAssigned(Collection
> partitions) {
> log.debug("{} Partitions assigned {}", WorkerSinkTask.this,
> partitions);
> lastCommittedOffsets = new HashMap<>();
> currentOffsets = new HashMap<>();
> for (TopicPartition tp : partitions) {
> long pos = consumer.position(tp);
> lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
>
> 55. "With this KIP, the only case in which this is possible is if the
> consumer fetches from an offset earlier than the log start offset." Is that
> true? I guess a user could seek to a large offset without providing
> leaderEpoch, which can cause the offset to be larger than the log end
> offset during fetch?
>
> 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be
> an existing field. Is LeaderEpochQuery the new field? The name is not very
> intuitive. It will be useful to document its meaning.
>
> 57. Should we deprecate the following api?
> void seek(TopicPartition partition, long offset);
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson 
> wrote:
>
> > Hey All,
> >
> > I think I've addressed all pending review. If there is no additional
> > feedback, I'll plan to start a vote thread next week.
> >
> > Thanks,
> > Jason
> >
> > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Thanks for your reply. I will comment below.
> > >
> > > Regarding 1, we probably can not simply rename both to `LeaderEpoch`
> > > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.
> > >
> > > Regarding 5, I am not strong on this. I agree with the two benefits of
> > > having two error codes: 1) not having to refresh metadata when consumer
> > > sees UNKNOWN_LEADER_EPOCH and 2) provide 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-03 Thread Jun Rao
Hi, Jason,

Thanks for the updated KIP. Well thought-through. Just a few minor comments
below.

50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess
under the cover, it will make OffsetsForLeaderEpoch request to determine if
the seeked offset is still valid before fetching? If so, it will be useful
document this in the wiki.

51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I
guess the consumer will also make OffsetsForLeaderEpoch request to
determine if the last consumed offset is still valid before fetching? If
so, it will be useful document this in the wiki.

52. "If the consumer seeks to the middle of the log, for example, then we
will use the sentinel value -1 and the leader will skip the epoch
validation. " Is this true? If the consumer seeks using seek(TopicPartition
partition, OffsetAndMetadata offset) and the seeked offset is valid, the
consumer can/should use the leaderEpoch in the cached metadata for fetching?

53. OffsetAndMetadata. For backward compatibility, we need to support
constructing OffsetAndMetadata without providing leaderEpoch. Could we
define the default value of leaderEpoch if not provided and the semantics
of that (e.g., skipping the epoch validation)?

54. I saw the following code in WorkerSinkTask in Connect. It saves the
offset obtained through position(), which can be committed latter. Since
position() doesn't return the leaderEpoch, this can lead to committed
offset without leaderEpoch. Not sure how common this usage is, but what's
the recommendation for such users?

private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection partitions) {
log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
lastCommittedOffsets = new HashMap<>();
currentOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
long pos = consumer.position(tp);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));

55. "With this KIP, the only case in which this is possible is if the
consumer fetches from an offset earlier than the log start offset." Is that
true? I guess a user could seek to a large offset without providing
leaderEpoch, which can cause the offset to be larger than the log end
offset during fetch?

56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be
an existing field. Is LeaderEpochQuery the new field? The name is not very
intuitive. It will be useful to document its meaning.

57. Should we deprecate the following api?
void seek(TopicPartition partition, long offset);

Thanks,

Jun


On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson  wrote:

> Hey All,
>
> I think I've addressed all pending review. If there is no additional
> feedback, I'll plan to start a vote thread next week.
>
> Thanks,
> Jason
>
> On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks for your reply. I will comment below.
> >
> > Regarding 1, we probably can not simply rename both to `LeaderEpoch`
> > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.
> >
> > Regarding 5, I am not strong on this. I agree with the two benefits of
> > having two error codes: 1) not having to refresh metadata when consumer
> > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log for
> > debugging. Whether or not these two benefits are useful enough for one
> more
> > error code may be subjective. I will let you and others determine this.
> >
> > Regarding 6, yeah overloading seek() looks good to me.
> >
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Dong,
> > >
> > > Thanks for the detailed review. Responses below:
> > >
> > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to
> > > simply call it LeaderEpoch for both APIs?
> > >
> > > 3: I agree it should be a map. I will update.
> > >
> > > 4: Fair point. I think we should always be able to identify an offset.
> > > Let's remove the Optional for now and reconsider if we find an
> unhandled
> > > case during implementation.
> > >
> > > 5: Yeah, I was thinking about this. The two error codes could be
> handled
> > > similarly, so we might merge them. Mainly I was thinking that it will
> be
> > > useful for consumers/replicas to know whether they are ahead or behind
> > the
> > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need
> not
> > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it
> > > could just stop fetching and await the LeaderAndIsr request that it is
> > > missing. It probably also makes debugging a little bit easier. I guess
> > I'm
> > > a bit inclined to keep both error codes, but I'm open to
> reconsideration
> > if
> > > you feel strongly. Another point to consider is whether we should
> > continue
> > > using NOT_LEADER_FOR_PARTITION if a follower receives an 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-03 Thread Jason Gustafson
Hey All,

I think I've addressed all pending review. If there is no additional
feedback, I'll plan to start a vote thread next week.

Thanks,
Jason

On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for your reply. I will comment below.
>
> Regarding 1, we probably can not simply rename both to `LeaderEpoch`
> because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.
>
> Regarding 5, I am not strong on this. I agree with the two benefits of
> having two error codes: 1) not having to refresh metadata when consumer
> sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log for
> debugging. Whether or not these two benefits are useful enough for one more
> error code may be subjective. I will let you and others determine this.
>
> Regarding 6, yeah overloading seek() looks good to me.
>
>
> Thanks,
> Dong
>
>
> On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson 
> wrote:
>
> > Hey Dong,
> >
> > Thanks for the detailed review. Responses below:
> >
> > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to
> > simply call it LeaderEpoch for both APIs?
> >
> > 3: I agree it should be a map. I will update.
> >
> > 4: Fair point. I think we should always be able to identify an offset.
> > Let's remove the Optional for now and reconsider if we find an unhandled
> > case during implementation.
> >
> > 5: Yeah, I was thinking about this. The two error codes could be handled
> > similarly, so we might merge them. Mainly I was thinking that it will be
> > useful for consumers/replicas to know whether they are ahead or behind
> the
> > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not
> > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it
> > could just stop fetching and await the LeaderAndIsr request that it is
> > missing. It probably also makes debugging a little bit easier. I guess
> I'm
> > a bit inclined to keep both error codes, but I'm open to reconsideration
> if
> > you feel strongly. Another point to consider is whether we should
> continue
> > using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected
> fetch.
> > The leader epoch would be different in this case so we could use one of
> the
> > invalid epoch error codes instead since they contain more information.
> >
> > 6: I agree the name is not ideal in that scenario. What if we overloaded
> > `seek`?
> >
> > 7: Sure, I will mention this.
> >
> >
> > Thanks,
> > Jason
> >
> > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Thanks for the update! I agree with the current proposal overall. I
> have
> > > some minor comments related to naming etc.
> > >
> > > 1) I am not strong and will just leave it here for discussion. Would it
> > be
> > > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the
> > new
> > > field in the OffsetsForLeaderEpochRequest? The reason is that
> > > "CurrentLeaderEpoch" may not necessarily be true current leader epoch
> if
> > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this
> > > epoch is what consumer expects on the broker which may or may not be
> the
> > > true value.
> > >
> > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the
> field
> > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both
> > > fields are compared with the leaderEpoch in the broker, would it be
> > better
> > > to give them the same name?
> > >
> > > 3) Currently LogTruncationException.truncationOffset() returns
> > > Optional to user. Should it return
> > > Optional> to handle the scenario
> > > where leaderEpoch of multiple partitions are different from the
> > leaderEpoch
> > > in the broker?
> > >
> > > 4) Currently LogTruncationException.truncationOffset() returns an
> > Optional
> > > value. Could you explain a bit more when it will return
> > Optional.empty()? I
> > > am trying to understand whether it is simpler and reasonable to
> > > replace Optional.empty()
> > > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).
> > >
> > > 5) Do we also need to add a new retriable exception for error code
> > > FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH
> > > and UNKNOWN_LEADER_EPOCH.
> > > It seems that the current KIP uses these two error codes in the same
> way
> > > and the exception for these two error codes is not exposed to the user.
> > > Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH?
> > >
> > > 6) For users who has turned off auto offset reset, when consumer.poll()
> > > throw LogTruncationException, it seems that user will most likely call
> > > seekToCommitted(offset,
> > > leaderEpoch) where offset and leaderEpoch are obtained from
> > > LogTruncationException.truncationOffset(). In this case, the offset
> used
> > > here is not committed, which is inconsistent from the method name
> > > seekToCommitted(...). Would it be better to rename the method to e.g.

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-31 Thread Dong Lin
Hey Jason,

Thanks for your reply. I will comment below.

Regarding 1, we probably can not simply rename both to `LeaderEpoch`
because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.

Regarding 5, I am not strong on this. I agree with the two benefits of
having two error codes: 1) not having to refresh metadata when consumer
sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log for
debugging. Whether or not these two benefits are useful enough for one more
error code may be subjective. I will let you and others determine this.

Regarding 6, yeah overloading seek() looks good to me.


Thanks,
Dong


On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson  wrote:

> Hey Dong,
>
> Thanks for the detailed review. Responses below:
>
> 1/2: Thanks for noticing the inconsistency. Would it be reasonable to
> simply call it LeaderEpoch for both APIs?
>
> 3: I agree it should be a map. I will update.
>
> 4: Fair point. I think we should always be able to identify an offset.
> Let's remove the Optional for now and reconsider if we find an unhandled
> case during implementation.
>
> 5: Yeah, I was thinking about this. The two error codes could be handled
> similarly, so we might merge them. Mainly I was thinking that it will be
> useful for consumers/replicas to know whether they are ahead or behind the
> leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not
> refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it
> could just stop fetching and await the LeaderAndIsr request that it is
> missing. It probably also makes debugging a little bit easier. I guess I'm
> a bit inclined to keep both error codes, but I'm open to reconsideration if
> you feel strongly. Another point to consider is whether we should continue
> using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected fetch.
> The leader epoch would be different in this case so we could use one of the
> invalid epoch error codes instead since they contain more information.
>
> 6: I agree the name is not ideal in that scenario. What if we overloaded
> `seek`?
>
> 7: Sure, I will mention this.
>
>
> Thanks,
> Jason
>
> On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks for the update! I agree with the current proposal overall. I have
> > some minor comments related to naming etc.
> >
> > 1) I am not strong and will just leave it here for discussion. Would it
> be
> > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the
> new
> > field in the OffsetsForLeaderEpochRequest? The reason is that
> > "CurrentLeaderEpoch" may not necessarily be true current leader epoch if
> > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this
> > epoch is what consumer expects on the broker which may or may not be the
> > true value.
> >
> > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the field
> > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both
> > fields are compared with the leaderEpoch in the broker, would it be
> better
> > to give them the same name?
> >
> > 3) Currently LogTruncationException.truncationOffset() returns
> > Optional to user. Should it return
> > Optional> to handle the scenario
> > where leaderEpoch of multiple partitions are different from the
> leaderEpoch
> > in the broker?
> >
> > 4) Currently LogTruncationException.truncationOffset() returns an
> Optional
> > value. Could you explain a bit more when it will return
> Optional.empty()? I
> > am trying to understand whether it is simpler and reasonable to
> > replace Optional.empty()
> > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).
> >
> > 5) Do we also need to add a new retriable exception for error code
> > FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH
> > and UNKNOWN_LEADER_EPOCH.
> > It seems that the current KIP uses these two error codes in the same way
> > and the exception for these two error codes is not exposed to the user.
> > Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH?
> >
> > 6) For users who has turned off auto offset reset, when consumer.poll()
> > throw LogTruncationException, it seems that user will most likely call
> > seekToCommitted(offset,
> > leaderEpoch) where offset and leaderEpoch are obtained from
> > LogTruncationException.truncationOffset(). In this case, the offset used
> > here is not committed, which is inconsistent from the method name
> > seekToCommitted(...). Would it be better to rename the method to e.g.
> > seekToLastConsumedMessage()?
> >
> > 7) Per point 3 in Jun's comment, would it be useful to explicitly specify
> > in the KIP that we will log the truncation event if user has turned on
> auto
> > offset reset policy?
> >
> >
> > Thanks,
> > Dong
> >
> >
> > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson 
> > wrote:
> >
> > > Thanks Anna, you are right on both points. I updated the KIP.
> > >
> > > -Jason
> > >
> > > On Thu, 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-30 Thread Jason Gustafson
Hey Dong,

Thanks for the detailed review. Responses below:

1/2: Thanks for noticing the inconsistency. Would it be reasonable to
simply call it LeaderEpoch for both APIs?

3: I agree it should be a map. I will update.

4: Fair point. I think we should always be able to identify an offset.
Let's remove the Optional for now and reconsider if we find an unhandled
case during implementation.

5: Yeah, I was thinking about this. The two error codes could be handled
similarly, so we might merge them. Mainly I was thinking that it will be
useful for consumers/replicas to know whether they are ahead or behind the
leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need not
refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, it
could just stop fetching and await the LeaderAndIsr request that it is
missing. It probably also makes debugging a little bit easier. I guess I'm
a bit inclined to keep both error codes, but I'm open to reconsideration if
you feel strongly. Another point to consider is whether we should continue
using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected fetch.
The leader epoch would be different in this case so we could use one of the
invalid epoch error codes instead since they contain more information.

6: I agree the name is not ideal in that scenario. What if we overloaded
`seek`?

7: Sure, I will mention this.


Thanks,
Jason

On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the update! I agree with the current proposal overall. I have
> some minor comments related to naming etc.
>
> 1) I am not strong and will just leave it here for discussion. Would it be
> better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new
> field in the OffsetsForLeaderEpochRequest? The reason is that
> "CurrentLeaderEpoch" may not necessarily be true current leader epoch if
> the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this
> epoch is what consumer expects on the broker which may or may not be the
> true value.
>
> 2) Currently we add the field "LeaderEpoch" to FetchRequest and the field
> "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both
> fields are compared with the leaderEpoch in the broker, would it be better
> to give them the same name?
>
> 3) Currently LogTruncationException.truncationOffset() returns
> Optional to user. Should it return
> Optional> to handle the scenario
> where leaderEpoch of multiple partitions are different from the leaderEpoch
> in the broker?
>
> 4) Currently LogTruncationException.truncationOffset() returns an Optional
> value. Could you explain a bit more when it will return Optional.empty()? I
> am trying to understand whether it is simpler and reasonable to
> replace Optional.empty()
> with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).
>
> 5) Do we also need to add a new retriable exception for error code
> FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH
> and UNKNOWN_LEADER_EPOCH.
> It seems that the current KIP uses these two error codes in the same way
> and the exception for these two error codes is not exposed to the user.
> Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH?
>
> 6) For users who has turned off auto offset reset, when consumer.poll()
> throw LogTruncationException, it seems that user will most likely call
> seekToCommitted(offset,
> leaderEpoch) where offset and leaderEpoch are obtained from
> LogTruncationException.truncationOffset(). In this case, the offset used
> here is not committed, which is inconsistent from the method name
> seekToCommitted(...). Would it be better to rename the method to e.g.
> seekToLastConsumedMessage()?
>
> 7) Per point 3 in Jun's comment, would it be useful to explicitly specify
> in the KIP that we will log the truncation event if user has turned on auto
> offset reset policy?
>
>
> Thanks,
> Dong
>
>
> On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson 
> wrote:
>
> > Thanks Anna, you are right on both points. I updated the KIP.
> >
> > -Jason
> >
> > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner  wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for the update. I agree with the current proposal.
> > >
> > > Two minor comments:
> > > 1) In “API Changes” section, first paragraph says that “users can catch
> > the
> > > more specific exception type and use the new `seekToNearest()` API
> > defined
> > > below.”. Since LogTruncationException “will include the partitions that
> > > were truncated and the offset of divergence”., shouldn’t the client use
> > > seek(offset) to seek to the offset of divergence in response to the
> > > exception?
> > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says
> > > “Note
> > > that consumers will send a sentinel value (-1) for the current epoch
> and
> > > the broker will simply disregard that validation.”. Is that still true
> > with
> > > MetadataResponse containing leader epoch?
> > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-27 Thread Dong Lin
Hey Jason,

Thanks for the update! I agree with the current proposal overall. I have
some minor comments related to naming etc.

1) I am not strong and will just leave it here for discussion. Would it be
better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for the new
field in the OffsetsForLeaderEpochRequest? The reason is that
"CurrentLeaderEpoch" may not necessarily be true current leader epoch if
the consumer has stale metadata. "ExpectedLeaderEpoch" shows that this
epoch is what consumer expects on the broker which may or may not be the
true value.

2) Currently we add the field "LeaderEpoch" to FetchRequest and the field
"CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that both
fields are compared with the leaderEpoch in the broker, would it be better
to give them the same name?

3) Currently LogTruncationException.truncationOffset() returns
Optional to user. Should it return
Optional> to handle the scenario
where leaderEpoch of multiple partitions are different from the leaderEpoch
in the broker?

4) Currently LogTruncationException.truncationOffset() returns an Optional
value. Could you explain a bit more when it will return Optional.empty()? I
am trying to understand whether it is simpler and reasonable to
replace Optional.empty()
with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).

5) Do we also need to add a new retriable exception for error code
FENCED_LEADER_EPOCH? And do we need to define both FENCED_LEADER_EPOCH
and UNKNOWN_LEADER_EPOCH.
It seems that the current KIP uses these two error codes in the same way
and the exception for these two error codes is not exposed to the user.
Maybe we should combine them into one error, e.g. INVALID_LEADER_EPOCH?

6) For users who has turned off auto offset reset, when consumer.poll()
throw LogTruncationException, it seems that user will most likely call
seekToCommitted(offset,
leaderEpoch) where offset and leaderEpoch are obtained from
LogTruncationException.truncationOffset(). In this case, the offset used
here is not committed, which is inconsistent from the method name
seekToCommitted(...). Would it be better to rename the method to e.g.
seekToLastConsumedMessage()?

7) Per point 3 in Jun's comment, would it be useful to explicitly specify
in the KIP that we will log the truncation event if user has turned on auto
offset reset policy?


Thanks,
Dong


On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson 
wrote:

> Thanks Anna, you are right on both points. I updated the KIP.
>
> -Jason
>
> On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner  wrote:
>
> > Hi Jason,
> >
> > Thanks for the update. I agree with the current proposal.
> >
> > Two minor comments:
> > 1) In “API Changes” section, first paragraph says that “users can catch
> the
> > more specific exception type and use the new `seekToNearest()` API
> defined
> > below.”. Since LogTruncationException “will include the partitions that
> > were truncated and the offset of divergence”., shouldn’t the client use
> > seek(offset) to seek to the offset of divergence in response to the
> > exception?
> > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says
> > “Note
> > that consumers will send a sentinel value (-1) for the current epoch and
> > the broker will simply disregard that validation.”. Is that still true
> with
> > MetadataResponse containing leader epoch?
> >
> > Thanks,
> > Anna
> >
> > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson 
> > wrote:
> >
> > > Hi All,
> > >
> > > I have made some updates to the KIP. As many of you know, a side
> project
> > of
> > > mine has been specifying the Kafka replication protocol in TLA. You can
> > > check out the code here if you are interested:
> > > https://github.com/hachikuji/kafka-specification. In addition to
> > > uncovering
> > > a couple unknown bugs in the replication protocol (e.g.
> > > https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me
> > > validate the behavior in this KIP. In fact, the original version I
> > proposed
> > > had a weakness. I initially suggested letting the leader validate the
> > > expected epoch at the fetch offset. This made sense for the consumer in
> > the
> > > handling of unclean leader election, but it was not strong enough to
> > > protect the follower in all cases. In order to make advancement of the
> > high
> > > watermark safe, for example, the leader actually needs to be sure that
> > > every follower in the ISR matches its own epoch.
> > >
> > > I attempted to fix this problem by treating the epoch in the fetch
> > request
> > > slightly differently for consumers and followers. For consumers, it
> would
> > > be the expected epoch of the record at the fetch offset, and the leader
> > > would raise a LOG_TRUNCATION error if the expectation failed. For
> > > followers, it would be the current epoch and the leader would require
> > that
> > > it match its own epoch. This was unsatisfying both because of the
> > > inconsistency in behavior and 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-27 Thread Jason Gustafson
Thanks Anna, you are right on both points. I updated the KIP.

-Jason

On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner  wrote:

> Hi Jason,
>
> Thanks for the update. I agree with the current proposal.
>
> Two minor comments:
> 1) In “API Changes” section, first paragraph says that “users can catch the
> more specific exception type and use the new `seekToNearest()` API defined
> below.”. Since LogTruncationException “will include the partitions that
> were truncated and the offset of divergence”., shouldn’t the client use
> seek(offset) to seek to the offset of divergence in response to the
> exception?
> 2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says
> “Note
> that consumers will send a sentinel value (-1) for the current epoch and
> the broker will simply disregard that validation.”. Is that still true with
> MetadataResponse containing leader epoch?
>
> Thanks,
> Anna
>
> On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson 
> wrote:
>
> > Hi All,
> >
> > I have made some updates to the KIP. As many of you know, a side project
> of
> > mine has been specifying the Kafka replication protocol in TLA. You can
> > check out the code here if you are interested:
> > https://github.com/hachikuji/kafka-specification. In addition to
> > uncovering
> > a couple unknown bugs in the replication protocol (e.g.
> > https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me
> > validate the behavior in this KIP. In fact, the original version I
> proposed
> > had a weakness. I initially suggested letting the leader validate the
> > expected epoch at the fetch offset. This made sense for the consumer in
> the
> > handling of unclean leader election, but it was not strong enough to
> > protect the follower in all cases. In order to make advancement of the
> high
> > watermark safe, for example, the leader actually needs to be sure that
> > every follower in the ISR matches its own epoch.
> >
> > I attempted to fix this problem by treating the epoch in the fetch
> request
> > slightly differently for consumers and followers. For consumers, it would
> > be the expected epoch of the record at the fetch offset, and the leader
> > would raise a LOG_TRUNCATION error if the expectation failed. For
> > followers, it would be the current epoch and the leader would require
> that
> > it match its own epoch. This was unsatisfying both because of the
> > inconsistency in behavior and because the consumer was left with the
> weaker
> > fencing that we already knew was insufficient for the replicas.
> Ultimately
> > I decided that we should make the behavior consistent and that meant that
> > the consumer needed to act more like a following replica. Instead of
> > checking for truncation while fetching, the consumer should check for
> > truncation after leader changes. After checking for truncation, the
> > consumer can then use the current epoch when fetching and get the
> stronger
> > protection that it provides. What this means is that the Metadata API
> must
> > include the current leader epoch. Given the problems we have had around
> > stale metadata and how challenging they have been to debug, I'm convinced
> > that this is a good idea in any case and it resolves the inconsistent
> > behavior in the Fetch API. The downside is that there will be some
> > additional overhead upon leader changes, but I don't think it is a major
> > concern since leader changes are rare and the OffsetForLeaderEpoch
> request
> > is cheap.
> >
> > This approach leaves the door open for some interesting follow up
> > improvements. For example, now that we have the leader epoch in the
> > Metadata request, we can implement similar fencing for the Produce API.
> And
> > now that the consumer can reason about truncation, we could consider
> having
> > a configuration to expose records beyond the high watermark. This would
> let
> > users trade lower end-to-end latency for weaker durability semantics. It
> is
> > sort of like having an acks=0 option for the consumer. Neither of these
> > options are included in this KIP, I am just mentioning them as potential
> > work for the future.
> >
> > Finally, based on the discussion in this thread, I have added the
> > seekToCommitted API for the consumer. Please take a look and let me know
> > what you think.
> >
> > Thanks,
> > Jason
> >
> > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Jason,
> > >
> > > The proposed API seems reasonable to me too. Could you please also
> update
> > > the wiki page (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> > > with a section say "workflow" on how the proposed API will be co-used
> > with
> > > others to:
> > >
> > > 1. consumer callers handling a LogTruncationException.
> > > 2. consumer internals for handling a retriable
> > UnknownLeaderEpochException.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jul 17, 2018 at 10:23 AM, Anna 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-26 Thread Anna Povzner
Hi Jason,

Thanks for the update. I agree with the current proposal.

Two minor comments:
1) In “API Changes” section, first paragraph says that “users can catch the
more specific exception type and use the new `seekToNearest()` API defined
below.”. Since LogTruncationException “will include the partitions that
were truncated and the offset of divergence”., shouldn’t the client use
seek(offset) to seek to the offset of divergence in response to the
exception?
2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says “Note
that consumers will send a sentinel value (-1) for the current epoch and
the broker will simply disregard that validation.”. Is that still true with
MetadataResponse containing leader epoch?

Thanks,
Anna

On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson  wrote:

> Hi All,
>
> I have made some updates to the KIP. As many of you know, a side project of
> mine has been specifying the Kafka replication protocol in TLA. You can
> check out the code here if you are interested:
> https://github.com/hachikuji/kafka-specification. In addition to
> uncovering
> a couple unknown bugs in the replication protocol (e.g.
> https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me
> validate the behavior in this KIP. In fact, the original version I proposed
> had a weakness. I initially suggested letting the leader validate the
> expected epoch at the fetch offset. This made sense for the consumer in the
> handling of unclean leader election, but it was not strong enough to
> protect the follower in all cases. In order to make advancement of the high
> watermark safe, for example, the leader actually needs to be sure that
> every follower in the ISR matches its own epoch.
>
> I attempted to fix this problem by treating the epoch in the fetch request
> slightly differently for consumers and followers. For consumers, it would
> be the expected epoch of the record at the fetch offset, and the leader
> would raise a LOG_TRUNCATION error if the expectation failed. For
> followers, it would be the current epoch and the leader would require that
> it match its own epoch. This was unsatisfying both because of the
> inconsistency in behavior and because the consumer was left with the weaker
> fencing that we already knew was insufficient for the replicas. Ultimately
> I decided that we should make the behavior consistent and that meant that
> the consumer needed to act more like a following replica. Instead of
> checking for truncation while fetching, the consumer should check for
> truncation after leader changes. After checking for truncation, the
> consumer can then use the current epoch when fetching and get the stronger
> protection that it provides. What this means is that the Metadata API must
> include the current leader epoch. Given the problems we have had around
> stale metadata and how challenging they have been to debug, I'm convinced
> that this is a good idea in any case and it resolves the inconsistent
> behavior in the Fetch API. The downside is that there will be some
> additional overhead upon leader changes, but I don't think it is a major
> concern since leader changes are rare and the OffsetForLeaderEpoch request
> is cheap.
>
> This approach leaves the door open for some interesting follow up
> improvements. For example, now that we have the leader epoch in the
> Metadata request, we can implement similar fencing for the Produce API. And
> now that the consumer can reason about truncation, we could consider having
> a configuration to expose records beyond the high watermark. This would let
> users trade lower end-to-end latency for weaker durability semantics. It is
> sort of like having an acks=0 option for the consumer. Neither of these
> options are included in this KIP, I am just mentioning them as potential
> work for the future.
>
> Finally, based on the discussion in this thread, I have added the
> seekToCommitted API for the consumer. Please take a look and let me know
> what you think.
>
> Thanks,
> Jason
>
> On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > The proposed API seems reasonable to me too. Could you please also update
> > the wiki page (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> > with a section say "workflow" on how the proposed API will be co-used
> with
> > others to:
> >
> > 1. consumer callers handling a LogTruncationException.
> > 2. consumer internals for handling a retriable
> UnknownLeaderEpochException.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner 
> wrote:
> >
> > > Hi Jason,
> > >
> > >
> > > I also like your proposal and agree that
> KafkaConsumer#seekToCommitted()
> > > is
> > > more intuitive as a way to initialize both consumer's position and its
> > > fetch state.
> > >
> > >
> > > My understanding that KafkaConsumer#seekToCommitted() is purely for
> > > clients
> > > who store 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-25 Thread Jason Gustafson
Hi All,

I have made some updates to the KIP. As many of you know, a side project of
mine has been specifying the Kafka replication protocol in TLA. You can
check out the code here if you are interested:
https://github.com/hachikuji/kafka-specification. In addition to uncovering
a couple unknown bugs in the replication protocol (e.g.
https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me
validate the behavior in this KIP. In fact, the original version I proposed
had a weakness. I initially suggested letting the leader validate the
expected epoch at the fetch offset. This made sense for the consumer in the
handling of unclean leader election, but it was not strong enough to
protect the follower in all cases. In order to make advancement of the high
watermark safe, for example, the leader actually needs to be sure that
every follower in the ISR matches its own epoch.

I attempted to fix this problem by treating the epoch in the fetch request
slightly differently for consumers and followers. For consumers, it would
be the expected epoch of the record at the fetch offset, and the leader
would raise a LOG_TRUNCATION error if the expectation failed. For
followers, it would be the current epoch and the leader would require that
it match its own epoch. This was unsatisfying both because of the
inconsistency in behavior and because the consumer was left with the weaker
fencing that we already knew was insufficient for the replicas. Ultimately
I decided that we should make the behavior consistent and that meant that
the consumer needed to act more like a following replica. Instead of
checking for truncation while fetching, the consumer should check for
truncation after leader changes. After checking for truncation, the
consumer can then use the current epoch when fetching and get the stronger
protection that it provides. What this means is that the Metadata API must
include the current leader epoch. Given the problems we have had around
stale metadata and how challenging they have been to debug, I'm convinced
that this is a good idea in any case and it resolves the inconsistent
behavior in the Fetch API. The downside is that there will be some
additional overhead upon leader changes, but I don't think it is a major
concern since leader changes are rare and the OffsetForLeaderEpoch request
is cheap.

This approach leaves the door open for some interesting follow up
improvements. For example, now that we have the leader epoch in the
Metadata request, we can implement similar fencing for the Produce API. And
now that the consumer can reason about truncation, we could consider having
a configuration to expose records beyond the high watermark. This would let
users trade lower end-to-end latency for weaker durability semantics. It is
sort of like having an acks=0 option for the consumer. Neither of these
options are included in this KIP, I am just mentioning them as potential
work for the future.

Finally, based on the discussion in this thread, I have added the
seekToCommitted API for the consumer. Please take a look and let me know
what you think.

Thanks,
Jason

On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang  wrote:

> Hi Jason,
>
> The proposed API seems reasonable to me too. Could you please also update
> the wiki page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> with a section say "workflow" on how the proposed API will be co-used with
> others to:
>
> 1. consumer callers handling a LogTruncationException.
> 2. consumer internals for handling a retriable UnknownLeaderEpochException.
>
>
> Guozhang
>
>
> On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner  wrote:
>
> > Hi Jason,
> >
> >
> > I also like your proposal and agree that KafkaConsumer#seekToCommitted()
> > is
> > more intuitive as a way to initialize both consumer's position and its
> > fetch state.
> >
> >
> > My understanding that KafkaConsumer#seekToCommitted() is purely for
> > clients
> > who store their offsets externally, right? And we are still going to
> > add KafkaConsumer#findOffsets()
> > in this KIP as we discussed, so that the client can handle
> > LogTruncationException?
> >
> >
> > Thanks,
> >
> > Anna
> >
> >
> > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > It is a great summary. The solution sounds good. I might have minor
> > > comments regarding the method name. But we can discuss that minor
> points
> > > later after we reach consensus on the high level API.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Anna and Dong,
> > > >
> > > > Thanks a lot for the great discussion. I've been hanging back a bit
> > > because
> > > > honestly the best option hasn't seemed clear. I agree with Anna's
> > general
> > > > observation that there is a distinction between the position of the
> > > > consumer and its fetch state up to that position. 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-20 Thread Guozhang Wang
Hi Jason,

The proposed API seems reasonable to me too. Could you please also update
the wiki page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
with a section say "workflow" on how the proposed API will be co-used with
others to:

1. consumer callers handling a LogTruncationException.
2. consumer internals for handling a retriable UnknownLeaderEpochException.


Guozhang


On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner  wrote:

> Hi Jason,
>
>
> I also like your proposal and agree that KafkaConsumer#seekToCommitted()
> is
> more intuitive as a way to initialize both consumer's position and its
> fetch state.
>
>
> My understanding that KafkaConsumer#seekToCommitted() is purely for
> clients
> who store their offsets externally, right? And we are still going to
> add KafkaConsumer#findOffsets()
> in this KIP as we discussed, so that the client can handle
> LogTruncationException?
>
>
> Thanks,
>
> Anna
>
>
> On Thu, Jul 12, 2018 at 3:57 PM Dong Lin  wrote:
>
> > Hey Jason,
> >
> > It is a great summary. The solution sounds good. I might have minor
> > comments regarding the method name. But we can discuss that minor points
> > later after we reach consensus on the high level API.
> >
> > Thanks,
> > Dong
> >
> >
> > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson 
> > wrote:
> >
> > > Hey Anna and Dong,
> > >
> > > Thanks a lot for the great discussion. I've been hanging back a bit
> > because
> > > honestly the best option hasn't seemed clear. I agree with Anna's
> general
> > > observation that there is a distinction between the position of the
> > > consumer and its fetch state up to that position. If you think about
> it,
> > a
> > > committed offset actually represents both of these. The metadata is
> used
> > to
> > > initialize the state of the consumer application and the offset
> > initializes
> > > the position. Additionally, we are extending the offset commit in this
> > KIP
> > > to also include the last epoch fetched by the consumer, which is used
> to
> > > initialize the internal fetch state. Of course if you do an arbitrary
> > > `seek` and immediately commit offsets, then there won't be a last epoch
> > to
> > > commit. This seems intuitive since there is no fetch state in this
> case.
> > We
> > > only commit fetch state when we have it.
> > >
> > > So if we think about a committed offset as initializing both the
> > consumer's
> > > position and its fetch state, then the gap in the API is evidently that
> > we
> > > don't have a way to initialize the consumer to a committed offset. We
> do
> > it
> > > implicitly of course for offsets stored in Kafka, but since external
> > > storage is a use case we support, then we should have an explicit API
> as
> > > well. Perhaps something like this:
> > >
> > > seekToCommitted(TopicPartition, OffsetAndMetadata)
> > >
> > > In this KIP, we are proposing to allow the `OffsetAndMetadata` object
> to
> > > include the leader epoch, so I think this would have the same effect as
> > > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit
> > given
> > > the current API? Furthermore, if we find a need for additional metadata
> > in
> > > the offset commit API in the future, then we will just need to modify
> the
> > > `OffsetAndMetadata` object and we will not need a new `seek` API.
> > >
> > > With this approach, I think then we can leave the `position` API as it
> > is.
> > > The position of the consumer is still just the next expected fetch
> > offset.
> > > If a user needs to record additional state based on previous fetch
> > > progress, then they would use the result of the previous fetch to
> obtain
> > > it. This makes the dependence on fetch progress explicit. I think we
> > could
> > > make this a little more convenience with a helper in the
> > `ConsumerRecords`
> > > object, but I think that's more of a nice-to-have.
> > >
> > > Thoughts?
> > >
> > > By the way, I have been iterating a little bit on the replica side of
> > this
> > > KIP. My initial proposal in fact did not have strong enough fencing to
> > > protect all of the edge cases. I believe the current proposal fixes the
> > > problems, but I am still verifying the model.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin 
> wrote:
> > >
> > > > Hey Anna,
> > > >
> > > > Thanks much for the explanation. Approach 1 also sounds good to me. I
> > > think
> > > > findOffsets() is useful for users who don't use automatic offset
> reset
> > > > policy.
> > > >
> > > > Just one more question. Since users who store offsets externally need
> > to
> > > > provide leaderEpoch to findOffsets(...), do we need an extra API for
> > user
> > > > to get both offset and leaderEpoch, e.g. recordPosition()?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner 
> > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > >
> > > > > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-17 Thread Anna Povzner
Hi Jason,


I also like your proposal and agree that KafkaConsumer#seekToCommitted() is
more intuitive as a way to initialize both consumer's position and its
fetch state.


My understanding that KafkaConsumer#seekToCommitted() is purely for clients
who store their offsets externally, right? And we are still going to
add KafkaConsumer#findOffsets()
in this KIP as we discussed, so that the client can handle
LogTruncationException?


Thanks,

Anna


On Thu, Jul 12, 2018 at 3:57 PM Dong Lin  wrote:

> Hey Jason,
>
> It is a great summary. The solution sounds good. I might have minor
> comments regarding the method name. But we can discuss that minor points
> later after we reach consensus on the high level API.
>
> Thanks,
> Dong
>
>
> On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson 
> wrote:
>
> > Hey Anna and Dong,
> >
> > Thanks a lot for the great discussion. I've been hanging back a bit
> because
> > honestly the best option hasn't seemed clear. I agree with Anna's general
> > observation that there is a distinction between the position of the
> > consumer and its fetch state up to that position. If you think about it,
> a
> > committed offset actually represents both of these. The metadata is used
> to
> > initialize the state of the consumer application and the offset
> initializes
> > the position. Additionally, we are extending the offset commit in this
> KIP
> > to also include the last epoch fetched by the consumer, which is used to
> > initialize the internal fetch state. Of course if you do an arbitrary
> > `seek` and immediately commit offsets, then there won't be a last epoch
> to
> > commit. This seems intuitive since there is no fetch state in this case.
> We
> > only commit fetch state when we have it.
> >
> > So if we think about a committed offset as initializing both the
> consumer's
> > position and its fetch state, then the gap in the API is evidently that
> we
> > don't have a way to initialize the consumer to a committed offset. We do
> it
> > implicitly of course for offsets stored in Kafka, but since external
> > storage is a use case we support, then we should have an explicit API as
> > well. Perhaps something like this:
> >
> > seekToCommitted(TopicPartition, OffsetAndMetadata)
> >
> > In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
> > include the leader epoch, so I think this would have the same effect as
> > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit
> given
> > the current API? Furthermore, if we find a need for additional metadata
> in
> > the offset commit API in the future, then we will just need to modify the
> > `OffsetAndMetadata` object and we will not need a new `seek` API.
> >
> > With this approach, I think then we can leave the `position` API as it
> is.
> > The position of the consumer is still just the next expected fetch
> offset.
> > If a user needs to record additional state based on previous fetch
> > progress, then they would use the result of the previous fetch to obtain
> > it. This makes the dependence on fetch progress explicit. I think we
> could
> > make this a little more convenience with a helper in the
> `ConsumerRecords`
> > object, but I think that's more of a nice-to-have.
> >
> > Thoughts?
> >
> > By the way, I have been iterating a little bit on the replica side of
> this
> > KIP. My initial proposal in fact did not have strong enough fencing to
> > protect all of the edge cases. I believe the current proposal fixes the
> > problems, but I am still verifying the model.
> >
> > Thanks,
> > Jason
> >
> >
> > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin  wrote:
> >
> > > Hey Anna,
> > >
> > > Thanks much for the explanation. Approach 1 also sounds good to me. I
> > think
> > > findOffsets() is useful for users who don't use automatic offset reset
> > > policy.
> > >
> > > Just one more question. Since users who store offsets externally need
> to
> > > provide leaderEpoch to findOffsets(...), do we need an extra API for
> user
> > > to get both offset and leaderEpoch, e.g. recordPosition()?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner 
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > >
> > > > What I called “not covering all use cases” is what you call
> best-effort
> > > > (not guaranteeing some corner cases). I think we are on the same page
> > > here.
> > > >
> > > >
> > > > I wanted to be clear in the API whether the consumer seeks to a
> > position
> > > > (offset) or to a record (offset, leader epoch). The only use-case of
> > > > seeking to a record is seeking to a committed offset for a user who
> > > stores
> > > > committed offsets externally. (Unless users find some other reason to
> > > seek
> > > > to a record.) I thought it was possible to provide this functionality
> > > with
> > > > findOffset(offset, leader epoch) followed by a seek(offset). However,
> > you
> > > > are right that this will not handle the race condition where
> > > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-12 Thread Dong Lin
Hey Jason,

It is a great summary. The solution sounds good. I might have minor
comments regarding the method name. But we can discuss that minor points
later after we reach consensus on the high level API.

Thanks,
Dong


On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson 
wrote:

> Hey Anna and Dong,
>
> Thanks a lot for the great discussion. I've been hanging back a bit because
> honestly the best option hasn't seemed clear. I agree with Anna's general
> observation that there is a distinction between the position of the
> consumer and its fetch state up to that position. If you think about it, a
> committed offset actually represents both of these. The metadata is used to
> initialize the state of the consumer application and the offset initializes
> the position. Additionally, we are extending the offset commit in this KIP
> to also include the last epoch fetched by the consumer, which is used to
> initialize the internal fetch state. Of course if you do an arbitrary
> `seek` and immediately commit offsets, then there won't be a last epoch to
> commit. This seems intuitive since there is no fetch state in this case. We
> only commit fetch state when we have it.
>
> So if we think about a committed offset as initializing both the consumer's
> position and its fetch state, then the gap in the API is evidently that we
> don't have a way to initialize the consumer to a committed offset. We do it
> implicitly of course for offsets stored in Kafka, but since external
> storage is a use case we support, then we should have an explicit API as
> well. Perhaps something like this:
>
> seekToCommitted(TopicPartition, OffsetAndMetadata)
>
> In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
> include the leader epoch, so I think this would have the same effect as
> Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given
> the current API? Furthermore, if we find a need for additional metadata in
> the offset commit API in the future, then we will just need to modify the
> `OffsetAndMetadata` object and we will not need a new `seek` API.
>
> With this approach, I think then we can leave the `position` API as it is.
> The position of the consumer is still just the next expected fetch offset.
> If a user needs to record additional state based on previous fetch
> progress, then they would use the result of the previous fetch to obtain
> it. This makes the dependence on fetch progress explicit. I think we could
> make this a little more convenience with a helper in the `ConsumerRecords`
> object, but I think that's more of a nice-to-have.
>
> Thoughts?
>
> By the way, I have been iterating a little bit on the replica side of this
> KIP. My initial proposal in fact did not have strong enough fencing to
> protect all of the edge cases. I believe the current proposal fixes the
> problems, but I am still verifying the model.
>
> Thanks,
> Jason
>
>
> On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin  wrote:
>
> > Hey Anna,
> >
> > Thanks much for the explanation. Approach 1 also sounds good to me. I
> think
> > findOffsets() is useful for users who don't use automatic offset reset
> > policy.
> >
> > Just one more question. Since users who store offsets externally need to
> > provide leaderEpoch to findOffsets(...), do we need an extra API for user
> > to get both offset and leaderEpoch, e.g. recordPosition()?
> >
> > Thanks,
> > Dong
> >
> > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner 
> wrote:
> >
> > > Hi Dong,
> > >
> > >
> > > What I called “not covering all use cases” is what you call best-effort
> > > (not guaranteeing some corner cases). I think we are on the same page
> > here.
> > >
> > >
> > > I wanted to be clear in the API whether the consumer seeks to a
> position
> > > (offset) or to a record (offset, leader epoch). The only use-case of
> > > seeking to a record is seeking to a committed offset for a user who
> > stores
> > > committed offsets externally. (Unless users find some other reason to
> > seek
> > > to a record.) I thought it was possible to provide this functionality
> > with
> > > findOffset(offset, leader epoch) followed by a seek(offset). However,
> you
> > > are right that this will not handle the race condition where
> > non-divergent
> > > offset found by findOffset() could change again before the consumer
> does
> > > the first fetch.
> > >
> > >
> > > Regarding position() — if we add position that returns (offset, leader
> > > epoch), this is specifically a position after a record that was
> actually
> > > consumed or position of a committed record. In which case, I still
> think
> > > it’s cleaner to get a record position of consumed message from a new
> > helper
> > > method in ConsumerRecords() or from committed offsets.
> > >
> > >
> > > I think all the use-cases could be then covered with:
> > >
> > > (Approach 1)
> > >
> > > seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> > > consumer state;
> > >
> > > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-12 Thread Jason Gustafson
Hey Anna and Dong,

Thanks a lot for the great discussion. I've been hanging back a bit because
honestly the best option hasn't seemed clear. I agree with Anna's general
observation that there is a distinction between the position of the
consumer and its fetch state up to that position. If you think about it, a
committed offset actually represents both of these. The metadata is used to
initialize the state of the consumer application and the offset initializes
the position. Additionally, we are extending the offset commit in this KIP
to also include the last epoch fetched by the consumer, which is used to
initialize the internal fetch state. Of course if you do an arbitrary
`seek` and immediately commit offsets, then there won't be a last epoch to
commit. This seems intuitive since there is no fetch state in this case. We
only commit fetch state when we have it.

So if we think about a committed offset as initializing both the consumer's
position and its fetch state, then the gap in the API is evidently that we
don't have a way to initialize the consumer to a committed offset. We do it
implicitly of course for offsets stored in Kafka, but since external
storage is a use case we support, then we should have an explicit API as
well. Perhaps something like this:

seekToCommitted(TopicPartition, OffsetAndMetadata)

In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
include the leader epoch, so I think this would have the same effect as
Anna's suggested `seekToRecord`. But perhaps it is a more natural fit given
the current API? Furthermore, if we find a need for additional metadata in
the offset commit API in the future, then we will just need to modify the
`OffsetAndMetadata` object and we will not need a new `seek` API.

With this approach, I think then we can leave the `position` API as it is.
The position of the consumer is still just the next expected fetch offset.
If a user needs to record additional state based on previous fetch
progress, then they would use the result of the previous fetch to obtain
it. This makes the dependence on fetch progress explicit. I think we could
make this a little more convenience with a helper in the `ConsumerRecords`
object, but I think that's more of a nice-to-have.

Thoughts?

By the way, I have been iterating a little bit on the replica side of this
KIP. My initial proposal in fact did not have strong enough fencing to
protect all of the edge cases. I believe the current proposal fixes the
problems, but I am still verifying the model.

Thanks,
Jason


On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin  wrote:

> Hey Anna,
>
> Thanks much for the explanation. Approach 1 also sounds good to me. I think
> findOffsets() is useful for users who don't use automatic offset reset
> policy.
>
> Just one more question. Since users who store offsets externally need to
> provide leaderEpoch to findOffsets(...), do we need an extra API for user
> to get both offset and leaderEpoch, e.g. recordPosition()?
>
> Thanks,
> Dong
>
> On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner  wrote:
>
> > Hi Dong,
> >
> >
> > What I called “not covering all use cases” is what you call best-effort
> > (not guaranteeing some corner cases). I think we are on the same page
> here.
> >
> >
> > I wanted to be clear in the API whether the consumer seeks to a position
> > (offset) or to a record (offset, leader epoch). The only use-case of
> > seeking to a record is seeking to a committed offset for a user who
> stores
> > committed offsets externally. (Unless users find some other reason to
> seek
> > to a record.) I thought it was possible to provide this functionality
> with
> > findOffset(offset, leader epoch) followed by a seek(offset). However, you
> > are right that this will not handle the race condition where
> non-divergent
> > offset found by findOffset() could change again before the consumer does
> > the first fetch.
> >
> >
> > Regarding position() — if we add position that returns (offset, leader
> > epoch), this is specifically a position after a record that was actually
> > consumed or position of a committed record. In which case, I still think
> > it’s cleaner to get a record position of consumed message from a new
> helper
> > method in ConsumerRecords() or from committed offsets.
> >
> >
> > I think all the use-cases could be then covered with:
> >
> > (Approach 1)
> >
> > seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> > consumer state;
> >
> > findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}
> >
> >
> > If we agree that the race condition is also a corner case, then I think
> we
> > can cover use-cases with:
> >
> > (Approach 2)
> >
> > findOffsets(offset, leaderEpoch) returns offset — we still want leader
> > epoch as a parameter for the users who store their committed offsets
> > externally.
> >
> >
> > I am actually now leaning more to approach 1, since it is more explicit,
> > and maybe there are more use cases for it.
> >
> >
> > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-11 Thread Dong Lin
Hey Anna,

Thanks much for the explanation. Approach 1 also sounds good to me. I think
findOffsets() is useful for users who don't use automatic offset reset
policy.

Just one more question. Since users who store offsets externally need to
provide leaderEpoch to findOffsets(...), do we need an extra API for user
to get both offset and leaderEpoch, e.g. recordPosition()?

Thanks,
Dong

On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner  wrote:

> Hi Dong,
>
>
> What I called “not covering all use cases” is what you call best-effort
> (not guaranteeing some corner cases). I think we are on the same page here.
>
>
> I wanted to be clear in the API whether the consumer seeks to a position
> (offset) or to a record (offset, leader epoch). The only use-case of
> seeking to a record is seeking to a committed offset for a user who stores
> committed offsets externally. (Unless users find some other reason to seek
> to a record.) I thought it was possible to provide this functionality with
> findOffset(offset, leader epoch) followed by a seek(offset). However, you
> are right that this will not handle the race condition where non-divergent
> offset found by findOffset() could change again before the consumer does
> the first fetch.
>
>
> Regarding position() — if we add position that returns (offset, leader
> epoch), this is specifically a position after a record that was actually
> consumed or position of a committed record. In which case, I still think
> it’s cleaner to get a record position of consumed message from a new helper
> method in ConsumerRecords() or from committed offsets.
>
>
> I think all the use-cases could be then covered with:
>
> (Approach 1)
>
> seekToRecord(offset, leaderEpoch) — this will just initialize/set the
> consumer state;
>
> findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}
>
>
> If we agree that the race condition is also a corner case, then I think we
> can cover use-cases with:
>
> (Approach 2)
>
> findOffsets(offset, leaderEpoch) returns offset — we still want leader
> epoch as a parameter for the users who store their committed offsets
> externally.
>
>
> I am actually now leaning more to approach 1, since it is more explicit,
> and maybe there are more use cases for it.
>
>
> Thanks,
>
> Anna
>
>
> On Tue, Jul 10, 2018 at 3:47 PM Dong Lin  wrote:
>
> > Hey Anna,
> >
> > Thanks for the comment. To answer your question, it seems that we can
> cover
> > all case in this KIP. As stated in "Consumer Handling" section, KIP-101
> > based approach will be used to derive the truncation offset from the
> > 2-tuple (offset, leaderEpoch). This approach is best effort and it is
> > inaccurate only in very rare scenarios (as described in KIP-279).
> >
> > By using seek(offset, leaderEpoch), consumer will still be able to follow
> > this best-effort approach to detect log truncation and determine the
> > truncation offset. On the other hand, if we use seek(offset), consumer
> will
> > not detect log truncation in some cases which weakens the guarantee of
> this
> > KIP. Does this make sense?
> >
> > Thanks,
> > Dong
> >
> > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner  wrote:
> >
> > > Sorry, I hit "send" before finishing. Continuing...
> > >
> > >
> > > 2) Hiding most of the consumer handling log truncation logic with
> minimal
> > > exposure in KafkaConsumer API.  I was proposing this path.
> > >
> > >
> > > Before answering your specific questions… I want to answer to your
> > comment
> > > “In general, maybe we should discuss the final solution that covers all
> > > cases?”. With current KIP, we don’t cover all cases of consumer
> detecting
> > > log truncation because the KIP proposes a leader epoch cache in
> consumer
> > > that does not persist across restarts. Plus, we only store last
> committed
> > > offset (either internally or users can store externally). This has a
> > > limitation that the consumer will not always be able to find point of
> > > truncation just because we have a limited history (just one data
> point).
> > >
> > >
> > > So, maybe we should first agree on whether we accept that storing last
> > > committed offset/leader epoch has a limitation that the consumer will
> not
> > > be able to detect log truncation in all cases?
> > >
> > >
> > > Thanks,
> > >
> > > Anna
> > >
> > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner 
> wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the follow up! I finally have much more clear
> understanding
> > of
> > > > where you are coming from.
> > > >
> > > > You are right. The success of findOffsets()/finding a point of
> > > > non-divergence depends on whether we have enough entries in the
> > > consumer's
> > > > leader epoch cache. However, I think this is a fundamental limitation
> > of
> > > > having a leader epoch cache that does not persist across consumer
> > > restarts.
> > > >
> > > > If we consider the general case where consumer may or may not have
> this
> > > > cache, then I see two paths:
> > > > 1) 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-11 Thread Anna Povzner
Hi Dong,


What I called “not covering all use cases” is what you call best-effort
(not guaranteeing some corner cases). I think we are on the same page here.


I wanted to be clear in the API whether the consumer seeks to a position
(offset) or to a record (offset, leader epoch). The only use-case of
seeking to a record is seeking to a committed offset for a user who stores
committed offsets externally. (Unless users find some other reason to seek
to a record.) I thought it was possible to provide this functionality with
findOffset(offset, leader epoch) followed by a seek(offset). However, you
are right that this will not handle the race condition where non-divergent
offset found by findOffset() could change again before the consumer does
the first fetch.


Regarding position() — if we add position that returns (offset, leader
epoch), this is specifically a position after a record that was actually
consumed or position of a committed record. In which case, I still think
it’s cleaner to get a record position of consumed message from a new helper
method in ConsumerRecords() or from committed offsets.


I think all the use-cases could be then covered with:

(Approach 1)

seekToRecord(offset, leaderEpoch) — this will just initialize/set the
consumer state;

findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}


If we agree that the race condition is also a corner case, then I think we
can cover use-cases with:

(Approach 2)

findOffsets(offset, leaderEpoch) returns offset — we still want leader
epoch as a parameter for the users who store their committed offsets
externally.


I am actually now leaning more to approach 1, since it is more explicit,
and maybe there are more use cases for it.


Thanks,

Anna


On Tue, Jul 10, 2018 at 3:47 PM Dong Lin  wrote:

> Hey Anna,
>
> Thanks for the comment. To answer your question, it seems that we can cover
> all case in this KIP. As stated in "Consumer Handling" section, KIP-101
> based approach will be used to derive the truncation offset from the
> 2-tuple (offset, leaderEpoch). This approach is best effort and it is
> inaccurate only in very rare scenarios (as described in KIP-279).
>
> By using seek(offset, leaderEpoch), consumer will still be able to follow
> this best-effort approach to detect log truncation and determine the
> truncation offset. On the other hand, if we use seek(offset), consumer will
> not detect log truncation in some cases which weakens the guarantee of this
> KIP. Does this make sense?
>
> Thanks,
> Dong
>
> On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner  wrote:
>
> > Sorry, I hit "send" before finishing. Continuing...
> >
> >
> > 2) Hiding most of the consumer handling log truncation logic with minimal
> > exposure in KafkaConsumer API.  I was proposing this path.
> >
> >
> > Before answering your specific questions… I want to answer to your
> comment
> > “In general, maybe we should discuss the final solution that covers all
> > cases?”. With current KIP, we don’t cover all cases of consumer detecting
> > log truncation because the KIP proposes a leader epoch cache in consumer
> > that does not persist across restarts. Plus, we only store last committed
> > offset (either internally or users can store externally). This has a
> > limitation that the consumer will not always be able to find point of
> > truncation just because we have a limited history (just one data point).
> >
> >
> > So, maybe we should first agree on whether we accept that storing last
> > committed offset/leader epoch has a limitation that the consumer will not
> > be able to detect log truncation in all cases?
> >
> >
> > Thanks,
> >
> > Anna
> >
> > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner  wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the follow up! I finally have much more clear understanding
> of
> > > where you are coming from.
> > >
> > > You are right. The success of findOffsets()/finding a point of
> > > non-divergence depends on whether we have enough entries in the
> > consumer's
> > > leader epoch cache. However, I think this is a fundamental limitation
> of
> > > having a leader epoch cache that does not persist across consumer
> > restarts.
> > >
> > > If we consider the general case where consumer may or may not have this
> > > cache, then I see two paths:
> > > 1) Letting the user to track the leader epoch history externally, and
> > have
> > > more exposure to leader epoch and finding point of non-divergence in
> > > KafkaConsumer API. I understand this is the case you were talking
> about.
> > >
> > >
> > >
> > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin  wrote:
> > >
> > >> Hey Anna,
> > >>
> > >> Thanks much for your detailed explanation and example! It does help me
> > >> understand the difference between our understanding.
> > >>
> > >> So it seems that the solution based on findOffsets() currently focuses
> > >> mainly on the scenario that consumer has cached leaderEpoch -> offset
> > >> mapping whereas I was thinking about the 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Dong Lin
Hey Anna,

Thanks for the comment. To answer your question, it seems that we can cover
all case in this KIP. As stated in "Consumer Handling" section, KIP-101
based approach will be used to derive the truncation offset from the
2-tuple (offset, leaderEpoch). This approach is best effort and it is
inaccurate only in very rare scenarios (as described in KIP-279).

By using seek(offset, leaderEpoch), consumer will still be able to follow
this best-effort approach to detect log truncation and determine the
truncation offset. On the other hand, if we use seek(offset), consumer will
not detect log truncation in some cases which weakens the guarantee of this
KIP. Does this make sense?

Thanks,
Dong

On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner  wrote:

> Sorry, I hit "send" before finishing. Continuing...
>
>
> 2) Hiding most of the consumer handling log truncation logic with minimal
> exposure in KafkaConsumer API.  I was proposing this path.
>
>
> Before answering your specific questions… I want to answer to your comment
> “In general, maybe we should discuss the final solution that covers all
> cases?”. With current KIP, we don’t cover all cases of consumer detecting
> log truncation because the KIP proposes a leader epoch cache in consumer
> that does not persist across restarts. Plus, we only store last committed
> offset (either internally or users can store externally). This has a
> limitation that the consumer will not always be able to find point of
> truncation just because we have a limited history (just one data point).
>
>
> So, maybe we should first agree on whether we accept that storing last
> committed offset/leader epoch has a limitation that the consumer will not
> be able to detect log truncation in all cases?
>
>
> Thanks,
>
> Anna
>
> On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner  wrote:
>
> > Hi Dong,
> >
> > Thanks for the follow up! I finally have much more clear understanding of
> > where you are coming from.
> >
> > You are right. The success of findOffsets()/finding a point of
> > non-divergence depends on whether we have enough entries in the
> consumer's
> > leader epoch cache. However, I think this is a fundamental limitation of
> > having a leader epoch cache that does not persist across consumer
> restarts.
> >
> > If we consider the general case where consumer may or may not have this
> > cache, then I see two paths:
> > 1) Letting the user to track the leader epoch history externally, and
> have
> > more exposure to leader epoch and finding point of non-divergence in
> > KafkaConsumer API. I understand this is the case you were talking about.
> >
> >
> >
> > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin  wrote:
> >
> >> Hey Anna,
> >>
> >> Thanks much for your detailed explanation and example! It does help me
> >> understand the difference between our understanding.
> >>
> >> So it seems that the solution based on findOffsets() currently focuses
> >> mainly on the scenario that consumer has cached leaderEpoch -> offset
> >> mapping whereas I was thinking about the general case where consumer may
> >> or
> >> may not have this cache. I guess that is why we have different
> >> understanding here. I have some comments below.
> >>
> >>
> >> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed
> >> by
> >> seek(offset) works if consumer has the cached leaderEpoch -> offset
> >> mapping. But if we assume consumer has this cache, do we need to have
> >> leaderEpoch in the findOffsets(...)? Intuitively, the
> findOffsets(offset)
> >> can also derive the leaderEpoch using offset just like the proposed
> >> solution does with seek(offset).
> >>
> >>
> >> 4) If consumer does not have cached leaderEpoch -> offset mapping, which
> >> is
> >> the case if consumer is restarted on a new machine, then it is not clear
> >> what leaderEpoch would be included in the FetchRequest if consumer does
> >> seek(offset). This is the case that motivates the first question of the
> >> previous email. In general, maybe we should discuss the final solution
> >> that
> >> covers all cases?
> >>
> >>
> >> 5) The second question in my previous email is related to the following
> >> paragraph:
> >>
> >> "... In some cases, offsets returned from position() could be actual
> >> consumed messages by this consumer identified by {offset, leader epoch}.
> >> In
> >> other cases, position() returns offset that was not actually consumed.
> >> Suppose, the user calls position() for the last offset...".
> >>
> >> I guess my point is that, if user calls position() for the last offset
> and
> >> uses that offset in seek(...), then user can probably just call
> >> Consumer#seekToEnd() without calling position() and seek(...). Similarly
> >> user can call Consumer#seekToBeginning() to the seek to the earliest
> >> position without calling position() and seek(...). Thus position() only
> >> needs to return the actual consumed messages identified by {offset,
> leader
> >> epoch}. Does this make sense?
> >>
> >>

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Anna Povzner
Sorry, I hit "send" before finishing. Continuing...


2) Hiding most of the consumer handling log truncation logic with minimal
exposure in KafkaConsumer API.  I was proposing this path.


Before answering your specific questions… I want to answer to your comment
“In general, maybe we should discuss the final solution that covers all
cases?”. With current KIP, we don’t cover all cases of consumer detecting
log truncation because the KIP proposes a leader epoch cache in consumer
that does not persist across restarts. Plus, we only store last committed
offset (either internally or users can store externally). This has a
limitation that the consumer will not always be able to find point of
truncation just because we have a limited history (just one data point).


So, maybe we should first agree on whether we accept that storing last
committed offset/leader epoch has a limitation that the consumer will not
be able to detect log truncation in all cases?


Thanks,

Anna

On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner  wrote:

> Hi Dong,
>
> Thanks for the follow up! I finally have much more clear understanding of
> where you are coming from.
>
> You are right. The success of findOffsets()/finding a point of
> non-divergence depends on whether we have enough entries in the consumer's
> leader epoch cache. However, I think this is a fundamental limitation of
> having a leader epoch cache that does not persist across consumer restarts.
>
> If we consider the general case where consumer may or may not have this
> cache, then I see two paths:
> 1) Letting the user to track the leader epoch history externally, and have
> more exposure to leader epoch and finding point of non-divergence in
> KafkaConsumer API. I understand this is the case you were talking about.
>
>
>
> On Tue, Jul 10, 2018 at 12:16 PM Dong Lin  wrote:
>
>> Hey Anna,
>>
>> Thanks much for your detailed explanation and example! It does help me
>> understand the difference between our understanding.
>>
>> So it seems that the solution based on findOffsets() currently focuses
>> mainly on the scenario that consumer has cached leaderEpoch -> offset
>> mapping whereas I was thinking about the general case where consumer may
>> or
>> may not have this cache. I guess that is why we have different
>> understanding here. I have some comments below.
>>
>>
>> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed
>> by
>> seek(offset) works if consumer has the cached leaderEpoch -> offset
>> mapping. But if we assume consumer has this cache, do we need to have
>> leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset)
>> can also derive the leaderEpoch using offset just like the proposed
>> solution does with seek(offset).
>>
>>
>> 4) If consumer does not have cached leaderEpoch -> offset mapping, which
>> is
>> the case if consumer is restarted on a new machine, then it is not clear
>> what leaderEpoch would be included in the FetchRequest if consumer does
>> seek(offset). This is the case that motivates the first question of the
>> previous email. In general, maybe we should discuss the final solution
>> that
>> covers all cases?
>>
>>
>> 5) The second question in my previous email is related to the following
>> paragraph:
>>
>> "... In some cases, offsets returned from position() could be actual
>> consumed messages by this consumer identified by {offset, leader epoch}.
>> In
>> other cases, position() returns offset that was not actually consumed.
>> Suppose, the user calls position() for the last offset...".
>>
>> I guess my point is that, if user calls position() for the last offset and
>> uses that offset in seek(...), then user can probably just call
>> Consumer#seekToEnd() without calling position() and seek(...). Similarly
>> user can call Consumer#seekToBeginning() to the seek to the earliest
>> position without calling position() and seek(...). Thus position() only
>> needs to return the actual consumed messages identified by {offset, leader
>> epoch}. Does this make sense?
>>
>>
>> Thanks,
>> Dong
>>
>>
>> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner  wrote:
>>
>> > Hi Dong,
>> >
>> >
>> > Thanks for considering my suggestions.
>> >
>> >
>> > Based on your comments, I realized that my suggestion was not complete
>> with
>> > regard to KafkaConsumer API vs. consumer-broker protocol. While I
>> propose
>> > to keep KafkaConsumer#seek() unchanged and take offset only, the
>> underlying
>> > consumer will send the next FetchRequest() to broker with offset and
>> > leaderEpoch if it is known (based on leader epoch cache in consumer) —
>> note
>> > that this is different from the current KIP, which suggests to always
>> send
>> > unknown leader epoch after seek(). This way, if the consumer and a
>> broker
>> > agreed on the point of non-divergence, which is some {offset,
>> leaderEpoch}
>> > pair, the new leader which causes another truncation (even further back)
>> > will be able to detect new divergence and restart the 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Anna Povzner
Hi Dong,

Thanks for the follow up! I finally have much more clear understanding of
where you are coming from.

You are right. The success of findOffsets()/finding a point of
non-divergence depends on whether we have enough entries in the consumer's
leader epoch cache. However, I think this is a fundamental limitation of
having a leader epoch cache that does not persist across consumer restarts.

If we consider the general case where consumer may or may not have this
cache, then I see two paths:
1) Letting the user to track the leader epoch history externally, and have
more exposure to leader epoch and finding point of non-divergence in
KafkaConsumer API. I understand this is the case you were talking about.



On Tue, Jul 10, 2018 at 12:16 PM Dong Lin  wrote:

> Hey Anna,
>
> Thanks much for your detailed explanation and example! It does help me
> understand the difference between our understanding.
>
> So it seems that the solution based on findOffsets() currently focuses
> mainly on the scenario that consumer has cached leaderEpoch -> offset
> mapping whereas I was thinking about the general case where consumer may or
> may not have this cache. I guess that is why we have different
> understanding here. I have some comments below.
>
>
> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed by
> seek(offset) works if consumer has the cached leaderEpoch -> offset
> mapping. But if we assume consumer has this cache, do we need to have
> leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset)
> can also derive the leaderEpoch using offset just like the proposed
> solution does with seek(offset).
>
>
> 4) If consumer does not have cached leaderEpoch -> offset mapping, which is
> the case if consumer is restarted on a new machine, then it is not clear
> what leaderEpoch would be included in the FetchRequest if consumer does
> seek(offset). This is the case that motivates the first question of the
> previous email. In general, maybe we should discuss the final solution that
> covers all cases?
>
>
> 5) The second question in my previous email is related to the following
> paragraph:
>
> "... In some cases, offsets returned from position() could be actual
> consumed messages by this consumer identified by {offset, leader epoch}. In
> other cases, position() returns offset that was not actually consumed.
> Suppose, the user calls position() for the last offset...".
>
> I guess my point is that, if user calls position() for the last offset and
> uses that offset in seek(...), then user can probably just call
> Consumer#seekToEnd() without calling position() and seek(...). Similarly
> user can call Consumer#seekToBeginning() to the seek to the earliest
> position without calling position() and seek(...). Thus position() only
> needs to return the actual consumed messages identified by {offset, leader
> epoch}. Does this make sense?
>
>
> Thanks,
> Dong
>
>
> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner  wrote:
>
> > Hi Dong,
> >
> >
> > Thanks for considering my suggestions.
> >
> >
> > Based on your comments, I realized that my suggestion was not complete
> with
> > regard to KafkaConsumer API vs. consumer-broker protocol. While I propose
> > to keep KafkaConsumer#seek() unchanged and take offset only, the
> underlying
> > consumer will send the next FetchRequest() to broker with offset and
> > leaderEpoch if it is known (based on leader epoch cache in consumer) —
> note
> > that this is different from the current KIP, which suggests to always
> send
> > unknown leader epoch after seek(). This way, if the consumer and a broker
> > agreed on the point of non-divergence, which is some {offset,
> leaderEpoch}
> > pair, the new leader which causes another truncation (even further back)
> > will be able to detect new divergence and restart the process of finding
> > the new point of non-divergence. So, to answer your question, If the
> > truncation happens just after the user calls
> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset),
> > the user will not seek to the wrong position without knowing that
> > truncation has happened, because the consumer will get another truncation
> > error, and seek again.
> >
> >
> > I am afraid, I did not understand your second question. Let me summarize
> my
> > suggestions again, and then give an example to hopefully make my
> > suggestions more clear. Also, the last part of my example shows how the
> > use-case in your first question will work. If it does not answer your
> > second question, would you mind clarifying? I am also focusing on the
> case
> > of a consumer having enough entries in the cache. The case of restarting
> > from committed offset either stored externally or internally will
> probably
> > need to be discussed more.
> >
> >
> > Let me summarize my suggestion again:
> >
> > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged
> >
> > 2) New KafkaConsumer#findOffsets() takes {offset, 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Dong Lin
Hey Anna,

Thanks much for your detailed explanation and example! It does help me
understand the difference between our understanding.

So it seems that the solution based on findOffsets() currently focuses
mainly on the scenario that consumer has cached leaderEpoch -> offset
mapping whereas I was thinking about the general case where consumer may or
may not have this cache. I guess that is why we have different
understanding here. I have some comments below.


3) The proposed solution using findOffsets(offset, leaderEpoch) followed by
seek(offset) works if consumer has the cached leaderEpoch -> offset
mapping. But if we assume consumer has this cache, do we need to have
leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset)
can also derive the leaderEpoch using offset just like the proposed
solution does with seek(offset).


4) If consumer does not have cached leaderEpoch -> offset mapping, which is
the case if consumer is restarted on a new machine, then it is not clear
what leaderEpoch would be included in the FetchRequest if consumer does
seek(offset). This is the case that motivates the first question of the
previous email. In general, maybe we should discuss the final solution that
covers all cases?


5) The second question in my previous email is related to the following
paragraph:

"... In some cases, offsets returned from position() could be actual
consumed messages by this consumer identified by {offset, leader epoch}. In
other cases, position() returns offset that was not actually consumed.
Suppose, the user calls position() for the last offset...".

I guess my point is that, if user calls position() for the last offset and
uses that offset in seek(...), then user can probably just call
Consumer#seekToEnd() without calling position() and seek(...). Similarly
user can call Consumer#seekToBeginning() to the seek to the earliest
position without calling position() and seek(...). Thus position() only
needs to return the actual consumed messages identified by {offset, leader
epoch}. Does this make sense?


Thanks,
Dong


On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner  wrote:

> Hi Dong,
>
>
> Thanks for considering my suggestions.
>
>
> Based on your comments, I realized that my suggestion was not complete with
> regard to KafkaConsumer API vs. consumer-broker protocol. While I propose
> to keep KafkaConsumer#seek() unchanged and take offset only, the underlying
> consumer will send the next FetchRequest() to broker with offset and
> leaderEpoch if it is known (based on leader epoch cache in consumer) — note
> that this is different from the current KIP, which suggests to always send
> unknown leader epoch after seek(). This way, if the consumer and a broker
> agreed on the point of non-divergence, which is some {offset, leaderEpoch}
> pair, the new leader which causes another truncation (even further back)
> will be able to detect new divergence and restart the process of finding
> the new point of non-divergence. So, to answer your question, If the
> truncation happens just after the user calls
> KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset),
> the user will not seek to the wrong position without knowing that
> truncation has happened, because the consumer will get another truncation
> error, and seek again.
>
>
> I am afraid, I did not understand your second question. Let me summarize my
> suggestions again, and then give an example to hopefully make my
> suggestions more clear. Also, the last part of my example shows how the
> use-case in your first question will work. If it does not answer your
> second question, would you mind clarifying? I am also focusing on the case
> of a consumer having enough entries in the cache. The case of restarting
> from committed offset either stored externally or internally will probably
> need to be discussed more.
>
>
> Let me summarize my suggestion again:
>
> 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged
>
> 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair per
> topic partition and returns offset per topic partition.
>
> 3) FetchRequest() to broker after KafkaConsumer#seek() will contain the
> offset set by seek and leaderEpoch that corresponds to the offset based on
> leader epoch cache in the consumer.
>
>
> The rest of this e-mail is a long and contrived example with several log
> truncations and unclean leader elections to illustrate the API and your
> first use-case. Suppose we have three brokers. Initially, Broker A, B, and
> C has one message at offset 0 with leader epoch 0. Then, Broker A goes down
> for some time. Broker B becomes a leader with epoch 1, and writes messages
> to offsets 1 and 2. Broker C fetches offset 1, but before fetching offset
> 2, becomes a leader with leader epoch 2 and writes a message at offset 2.
> Here is the state of brokers at this point:
>
> > Broker A:
> > offset 0, epoch 0 <— leader
> > goes down…
>
>
> > Broker B:
> > offset 0, 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-09 Thread Anna Povzner
Hi Dong,


Thanks for considering my suggestions.


Based on your comments, I realized that my suggestion was not complete with
regard to KafkaConsumer API vs. consumer-broker protocol. While I propose
to keep KafkaConsumer#seek() unchanged and take offset only, the underlying
consumer will send the next FetchRequest() to broker with offset and
leaderEpoch if it is known (based on leader epoch cache in consumer) — note
that this is different from the current KIP, which suggests to always send
unknown leader epoch after seek(). This way, if the consumer and a broker
agreed on the point of non-divergence, which is some {offset, leaderEpoch}
pair, the new leader which causes another truncation (even further back)
will be able to detect new divergence and restart the process of finding
the new point of non-divergence. So, to answer your question, If the
truncation happens just after the user calls
KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset),
the user will not seek to the wrong position without knowing that
truncation has happened, because the consumer will get another truncation
error, and seek again.


I am afraid, I did not understand your second question. Let me summarize my
suggestions again, and then give an example to hopefully make my
suggestions more clear. Also, the last part of my example shows how the
use-case in your first question will work. If it does not answer your
second question, would you mind clarifying? I am also focusing on the case
of a consumer having enough entries in the cache. The case of restarting
from committed offset either stored externally or internally will probably
need to be discussed more.


Let me summarize my suggestion again:

1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged

2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair per
topic partition and returns offset per topic partition.

3) FetchRequest() to broker after KafkaConsumer#seek() will contain the
offset set by seek and leaderEpoch that corresponds to the offset based on
leader epoch cache in the consumer.


The rest of this e-mail is a long and contrived example with several log
truncations and unclean leader elections to illustrate the API and your
first use-case. Suppose we have three brokers. Initially, Broker A, B, and
C has one message at offset 0 with leader epoch 0. Then, Broker A goes down
for some time. Broker B becomes a leader with epoch 1, and writes messages
to offsets 1 and 2. Broker C fetches offset 1, but before fetching offset
2, becomes a leader with leader epoch 2 and writes a message at offset 2.
Here is the state of brokers at this point:

> Broker A:
> offset 0, epoch 0 <— leader
> goes down…


> Broker B:
> offset 0, epoch 0
> offset 1, epoch 1  <- leader
> offset 2, epoch 1



Broker C:
> offset 0, epoch 0
> offset 1, epoch 1
> offset 2, epoch 2 <— leader


Before Broker C becomes a leader with leader epoch 2, the consumer consumed
the following messages from broker A and broker B:

{offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, {offset=2,
leaderEpoch=1}.

Consumer’s leader epoch cache at this point contains the following entries:

(leaderEpoch=0, startOffset=0)

(leaderEpoch=1, startOffset=1)

endOffset = 3


Then, broker B becomes the follower of broker C, truncates and starts
fetching from offset 2.

Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets LOG_TRUNCATION
error from broker C.

In response, the client calls KafkaConsumer#findOffsets(offset=3,
leaderEpoch=1). The underlying consumer sends
OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with
{leaderEpoch=1, endOffset=2}. So, KafkaConsumer#findOffsets(offset=3,
leaderEpoch=1) returns offset=2.

In response, consumer calls KafkaConsumer@seek(offset=2) followed by
poll(), which results in FetchRequest(offset=2, leaderEpoch=1) to broker C.


I will continue with this example with the goal to answer your first
question about truncation just after findOffsets() followed by seek():

Suppose, brokers B and C go down, and broker A comes up and becomes a
leader with leader epoch 3, and writes a message to offset 1. Suppose, this
happens before the consumer gets response from broker C to the previous
fetch request:  FetchRequest(offset=2, leaderEpoch=1).

Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to broker A, which
returns LOG_TRUNCATION error, because broker A has leader epoch 3 >  leader
epoch in FetchRequest with starting offset = 1 < offset 2 in FetchRequest().

In response, the user calls KafkaConsumer#findOffsets(offset=2,
leaderEpoch=1). The underlying consumer sends
OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with
{leaderEpoch=0, endOffset=1}; the underlying consumer finds leaderEpoch = 0
in its cache with end offset == 1, which results in
KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning offset = 1.

In response, the user calls KafkaConsumer@seek(offset=1) followed by
poll(), which results in 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-07 Thread Dong Lin
Hey Anna,

Thanks much for the thoughtful reply. It makes sense to different between
"seeking to a message" and "seeking to a position". I have to questions
here:

- For "seeking to a message" use-case, with the proposed approach user
needs to call findOffset(offset, leaderEpoch) followed by seek(offset). If
message truncation and message append happen immediately after
findOffset(offset,
leaderEpoch) but before seek(offset), it seems that user will seek to the
wrong message without knowing the truncation has happened. Would this be a
problem?

- For "seeking to a position" use-case, it seems that there can be two
positions, i.e. earliest and latest. So these two cases can be
Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then it
seems that user will only need to call position() and seek() for "seeking
to a message" use-case?

Thanks,
Dong


On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner  wrote:

> Hi Jason and Dong,
>
>
> I’ve been thinking about your suggestions and discussion regarding
> position(), seek(), and new proposed API.
>
>
> Here is my thought process why we should keep position() and seek() API
> unchanged.
>
>
> I think we should separate {offset, leader epoch} that uniquely identifies
> a message from an offset that is a position. In some cases, offsets
> returned from position() could be actual consumed messages by this consumer
> identified by {offset, leader epoch}. In other cases, position() returns
> offset that was not actually consumed. Suppose, the user calls position()
> for the last offset. Suppose we return {offset, leader epoch} of the
> message currently in the log. Then, the message gets truncated before
> consumer’s first poll(). It does not make sense for poll() to fail in this
> case, because the log truncation did not actually happen from the consumer
> perspective. On the other hand, as the KIP proposes, it makes sense for the
> committed() method to return {offset, leader epoch} because those offsets
> represent actual consumed messages.
>
>
> The same argument applies to the seek() method — we are not seeking to a
> message, we are seeking to a position.
>
>
> I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
> something like:
>
> Map findOffsets(Map
> offsetsToSearch)
>
> Similar to seek() and position(), I think findOffsets() should return
> offset without leader epoch, because what we want is the offset that we
> think is closest to the not divergent message from the given consumed
> message. Until the consumer actually fetches the message, we should not let
> the consumer store the leader epoch for a message it did not consume.
>
>
> So, the workflow will be:
>
> 1) The user gets LogTruncationException with {offset, leader epoch of the
> previous message} (whatever we send with new FetchRecords request).
>
> 2) offset = findOffsets(tp -> {offset, leader epoch})
>
> 3) seek(offset)
>
>
> For the use-case where the users store committed offsets externally:
>
> 1) Such users would have to track the leader epoch together with an offset.
> Otherwise, there is no way to detect later what leader epoch was associated
> with the message. I think it’s reasonable to ask that from users if they
> want to detect log truncation. Otherwise, they will get the current
> behavior.
>
>
> If the users currently get an offset to be stored using position(), I see
> two possibilities. First, they call save offset returned from position()
> that they call before poll(). In that case, it would not be correct to
> store {offset, leader epoch} if we would have changed position() to return
> {offset, leader epoch} since actual fetched message could be different
> (from the example I described earlier). So, it would be more correct to
> call position() after poll(). However, the user already gets
> ConsumerRecords at this point, from which the user can extract {offset,
> leader epoch} of the last message.
>
>
> So, I like the idea of adding a helper method to ConsumerRecords, as Jason
> proposed, something like:
>
> public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is
> a data struct holding {offset, leader epoch}.
>
>
> In this case, we would advise the user to follow the workflow: poll(), get
> {offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(),
> save offset and leader epoch, process records.
>
>
> 2) When the user needs to seek to the last committed offset, they call new
> findOffsets(saved offset, leader epoch), and then seek(offset).
>
>
> What do you think?
>
>
> Thanks,
>
> Anna
>
>
> On Tue, Jul 3, 2018 at 4:06 PM Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks much for your thoughtful explanation.
> >
> > Yes the solution using findOffsets(offset, leaderEpoch) also works. The
> > advantage of this solution it adds only one API instead of two APIs. The
> > concern is that its usage seems a bit more clumsy for advanced users.
> More
> > specifically, advanced users who store offsets externally will 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-06 Thread Jun Rao
Hi, Jason,

Thanks for the KIP. Looks good overall. Just a few minor comments below.

1. "As the consumer is fetching from a partition, it will keep a small
cache of the recent epochs that were fetched for each partition. " Do we
need to cache more than one leader epoch? Also, during consumer failover,
initially, only the the last epoch will be available.

2. "This KIP has implications for the consumer's offset reset policy, which
defines what the consumer should do if its fetch offset becomes out of
range. With this KIP, the only case in which this is possible is if the
consumer fetches from an offset earlier than the log start offset. ". If
the fetch epoch matches that in the leader, but the offset is larger than
the leader's HW, should we still treat it as offset out of range?

3. "We propose in this KIP to change the behavior for both the "earliest"
and "latest" reset modes to do this automatically as long as the message
format supports lookup by leader epoch.  ". It will probably be useful to
indicate to the user that a reset has happened. So, it's probably useful to
at least log this in the client.

4. "If the user ignores the exception, we will continue fetching from the
current offset, but we will drop the last fetched offset metadata from the
new FetchRequest so that we do not get the same log truncation error." Is
it better to do this or keep failing?

5. LogTruncationException:  Should we add an error code for that?

6. "We have added fields for the leader epoch and the timestamp." It seems
that we only added the leader epoch?

Jun


On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson  wrote:

> Hey All,
>
> I wrote up a KIP to handle one more edge case in the replication protocol
> and to support better handling of truncation in the consumer when unclean
> leader election is enabled. Let me know what you think.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation
>
> Thanks to Anna Povzner and Dong Lin for initial feedback.
>
> Thanks,
> Jason
>


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-04 Thread Anna Povzner
Hi Jason and Dong,


I’ve been thinking about your suggestions and discussion regarding
position(), seek(), and new proposed API.


Here is my thought process why we should keep position() and seek() API
unchanged.


I think we should separate {offset, leader epoch} that uniquely identifies
a message from an offset that is a position. In some cases, offsets
returned from position() could be actual consumed messages by this consumer
identified by {offset, leader epoch}. In other cases, position() returns
offset that was not actually consumed. Suppose, the user calls position()
for the last offset. Suppose we return {offset, leader epoch} of the
message currently in the log. Then, the message gets truncated before
consumer’s first poll(). It does not make sense for poll() to fail in this
case, because the log truncation did not actually happen from the consumer
perspective. On the other hand, as the KIP proposes, it makes sense for the
committed() method to return {offset, leader epoch} because those offsets
represent actual consumed messages.


The same argument applies to the seek() method — we are not seeking to a
message, we are seeking to a position.


I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
something like:

Map findOffsets(Map
offsetsToSearch)

Similar to seek() and position(), I think findOffsets() should return
offset without leader epoch, because what we want is the offset that we
think is closest to the not divergent message from the given consumed
message. Until the consumer actually fetches the message, we should not let
the consumer store the leader epoch for a message it did not consume.


So, the workflow will be:

1) The user gets LogTruncationException with {offset, leader epoch of the
previous message} (whatever we send with new FetchRecords request).

2) offset = findOffsets(tp -> {offset, leader epoch})

3) seek(offset)


For the use-case where the users store committed offsets externally:

1) Such users would have to track the leader epoch together with an offset.
Otherwise, there is no way to detect later what leader epoch was associated
with the message. I think it’s reasonable to ask that from users if they
want to detect log truncation. Otherwise, they will get the current
behavior.


If the users currently get an offset to be stored using position(), I see
two possibilities. First, they call save offset returned from position()
that they call before poll(). In that case, it would not be correct to
store {offset, leader epoch} if we would have changed position() to return
{offset, leader epoch} since actual fetched message could be different
(from the example I described earlier). So, it would be more correct to
call position() after poll(). However, the user already gets
ConsumerRecords at this point, from which the user can extract {offset,
leader epoch} of the last message.


So, I like the idea of adding a helper method to ConsumerRecords, as Jason
proposed, something like:

public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is
a data struct holding {offset, leader epoch}.


In this case, we would advise the user to follow the workflow: poll(), get
{offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(),
save offset and leader epoch, process records.


2) When the user needs to seek to the last committed offset, they call new
findOffsets(saved offset, leader epoch), and then seek(offset).


What do you think?


Thanks,

Anna


On Tue, Jul 3, 2018 at 4:06 PM Dong Lin  wrote:

> Hey Jason,
>
> Thanks much for your thoughtful explanation.
>
> Yes the solution using findOffsets(offset, leaderEpoch) also works. The
> advantage of this solution it adds only one API instead of two APIs. The
> concern is that its usage seems a bit more clumsy for advanced users. More
> specifically, advanced users who store offsets externally will always need
> to call findOffsets() before calling seek(offset) during consumer
> initialization. And those advanced users will need to manually keep track
> of the leaderEpoch of the last ConsumerRecord.
>
> The other solution may be more user-friendly for advanced users is to add
> two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) =
> offsetEpochs(topicPartition)`.
>
> I kind of prefer the second solution because it is easier to use for
> advanced users. If we need to expose leaderEpoch anyway to safely identify
> a message, it may be conceptually simpler to expose it directly in
> seek(...) rather than requiring one more translation using
> findOffsets(...). But I am also OK with the first solution if other
> developers also favor that one :)
>
> Thanks,
> Dong
>
>
> On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson 
> wrote:
>
> > Hi Dong,
> >
> > Thanks, I've been thinking about your suggestions a bit. It is
> challenging
> > to make this work given the current APIs. One of the difficulties is that
> > we don't have an API to find the leader epoch for a given offset at the

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-03 Thread Dong Lin
Hey Jason,

Thanks much for your thoughtful explanation.

Yes the solution using findOffsets(offset, leaderEpoch) also works. The
advantage of this solution it adds only one API instead of two APIs. The
concern is that its usage seems a bit more clumsy for advanced users. More
specifically, advanced users who store offsets externally will always need
to call findOffsets() before calling seek(offset) during consumer
initialization. And those advanced users will need to manually keep track
of the leaderEpoch of the last ConsumerRecord.

The other solution may be more user-friendly for advanced users is to add
two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) =
offsetEpochs(topicPartition)`.

I kind of prefer the second solution because it is easier to use for
advanced users. If we need to expose leaderEpoch anyway to safely identify
a message, it may be conceptually simpler to expose it directly in
seek(...) rather than requiring one more translation using
findOffsets(...). But I am also OK with the first solution if other
developers also favor that one :)

Thanks,
Dong


On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson  wrote:

> Hi Dong,
>
> Thanks, I've been thinking about your suggestions a bit. It is challenging
> to make this work given the current APIs. One of the difficulties is that
> we don't have an API to find the leader epoch for a given offset at the
> moment. So if the user does a seek to offset 5, then we'll need a new API
> to find the corresponding epoch in order to fulfill the new position() API.
> Potentially we could modify ListOffsets to enable finding the leader epoch,
> but I am not sure it is worthwhile. Perhaps it is reasonable for advanced
> usage to expect that the epoch information, if needed, will be extracted
> from the records directly? It might make sense to expose a helper in
> `ConsumerRecords` to make this a little easier though.
>
> Alternatively, if we think it is important to have this information exposed
> directly, we could create batch APIs to solve the naming problem. For
> example:
>
> Map positions();
> void seek(Map positions);
>
> However, I'm actually leaning toward leaving the seek() and position() APIs
> unchanged. Instead, we can add a new API to search for offset by timestamp
> or by offset/leader epoch. Let's say we call it `findOffsets`. If the user
> hits a log truncation error, they can use this API to find the closest
> offset and then do a seek(). At the same time, we deprecate the
> `offsetsForTimes` APIs. We now have two use cases which require finding
> offsets, so I think we should make this API general and leave the door open
> for future extensions.
>
> By the way, I'm unclear about the desire to move part of this functionality
> to AdminClient. Guozhang suggested this previously, but I think it only
> makes sense for cross-cutting capabilities such as topic creation. If we
> have an API which is primarily useful by consumers, then I think that's
> where it should be exposed. The AdminClient also has its own API integrity
> and should not become a dumping ground for advanced use cases. I'll update
> the KIP with the  `findOffsets` API suggested above and we can see if it
> does a good enough job of keeping the API simple for common cases.
>
> Thanks,
> Jason
>
>
> On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Regarding seek(...), it seems that we want an API for user to initialize
> > consumer with (offset, leaderEpoch) and that API should allow throwing
> > PartitionTruncationException. Suppose we agree on this, then
> > seekToNearest() is not sufficient because it will always swallow
> > PartitionTruncationException. Here we have two options. The first option
> is
> > to add API offsetsForLeaderEpochs() to translate (leaderEpoch, offset) to
> > offset. The second option is to have add seek(offset, leaderEpoch). It
> > seems that second option may be more simpler because it makes it clear
> that
> > (offset, leaderEpoch) will be used to identify consumer's position in a
> > partition. And user only needs to handle PartitionTruncationException
> from
> > the poll(). In comparison the first option seems a bit harder to use
> > because user have to also handle the PartitionTruncationException if
> > offsetsForLeaderEpochs() returns different offset from user-provided
> > offset. What do you think?
> >
> > If we decide to add API seek(offset, leaderEpoch), then we can decide
> > whether and how to add API to translate (offset, leaderEpoch) to offset.
> It
> > seems that this API will be needed by advanced user to don't want auto
> > offset reset (so that it can be notified) but still wants to reset offset
> > to closest. For those users if probably makes sense to only have the API
> in
> > AdminClient. offsetsForTimes() seems like a common API that will be
> needed
> > by user's of consumer in general, so it may be more reasonable to stay in
> > the consumer API. I don't have a strong opinion on whether
> 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-02 Thread Jason Gustafson
Hi Dong,

Thanks, I've been thinking about your suggestions a bit. It is challenging
to make this work given the current APIs. One of the difficulties is that
we don't have an API to find the leader epoch for a given offset at the
moment. So if the user does a seek to offset 5, then we'll need a new API
to find the corresponding epoch in order to fulfill the new position() API.
Potentially we could modify ListOffsets to enable finding the leader epoch,
but I am not sure it is worthwhile. Perhaps it is reasonable for advanced
usage to expect that the epoch information, if needed, will be extracted
from the records directly? It might make sense to expose a helper in
`ConsumerRecords` to make this a little easier though.

Alternatively, if we think it is important to have this information exposed
directly, we could create batch APIs to solve the naming problem. For
example:

Map positions();
void seek(Map positions);

However, I'm actually leaning toward leaving the seek() and position() APIs
unchanged. Instead, we can add a new API to search for offset by timestamp
or by offset/leader epoch. Let's say we call it `findOffsets`. If the user
hits a log truncation error, they can use this API to find the closest
offset and then do a seek(). At the same time, we deprecate the
`offsetsForTimes` APIs. We now have two use cases which require finding
offsets, so I think we should make this API general and leave the door open
for future extensions.

By the way, I'm unclear about the desire to move part of this functionality
to AdminClient. Guozhang suggested this previously, but I think it only
makes sense for cross-cutting capabilities such as topic creation. If we
have an API which is primarily useful by consumers, then I think that's
where it should be exposed. The AdminClient also has its own API integrity
and should not become a dumping ground for advanced use cases. I'll update
the KIP with the  `findOffsets` API suggested above and we can see if it
does a good enough job of keeping the API simple for common cases.

Thanks,
Jason


On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin  wrote:

> Hey Jason,
>
> Regarding seek(...), it seems that we want an API for user to initialize
> consumer with (offset, leaderEpoch) and that API should allow throwing
> PartitionTruncationException. Suppose we agree on this, then
> seekToNearest() is not sufficient because it will always swallow
> PartitionTruncationException. Here we have two options. The first option is
> to add API offsetsForLeaderEpochs() to translate (leaderEpoch, offset) to
> offset. The second option is to have add seek(offset, leaderEpoch). It
> seems that second option may be more simpler because it makes it clear that
> (offset, leaderEpoch) will be used to identify consumer's position in a
> partition. And user only needs to handle PartitionTruncationException from
> the poll(). In comparison the first option seems a bit harder to use
> because user have to also handle the PartitionTruncationException if
> offsetsForLeaderEpochs() returns different offset from user-provided
> offset. What do you think?
>
> If we decide to add API seek(offset, leaderEpoch), then we can decide
> whether and how to add API to translate (offset, leaderEpoch) to offset. It
> seems that this API will be needed by advanced user to don't want auto
> offset reset (so that it can be notified) but still wants to reset offset
> to closest. For those users if probably makes sense to only have the API in
> AdminClient. offsetsForTimes() seems like a common API that will be needed
> by user's of consumer in general, so it may be more reasonable to stay in
> the consumer API. I don't have a strong opinion on whether
> offsetsForTimes() should be replaced by API in AdminClient.
>
> Though (offset, leaderEpoch) is needed to uniquely identify a message in
> general, it is only needed for advanced users who has turned on unclean
> leader election, need to use seek(..), and don't want auto offset reset.
> Most other users probably just want to enable auto offset reset and store
> offset in Kafka. Thus we might want to keep the existing offset-only APIs
> (e.g. seek() and position()) for most users while adding new APIs for
> advanced users. And yes, it seems that we need new name for position().
>
> Though I think we need new APIs to carry the new information (e.g.
> leaderEpoch), I am not very sure how that should look like. One possible
> option is those APIs in KIP-232. Another option is something like this:
>
> `
> class OffsetEpochs {
>   long offset;
>   int leaderEpoch;
>   int partitionEpoch;   // This may be needed later as discussed in KIP-232
>   ... // Hopefully these are all we need to identify message in Kafka. But
> if we need more then we can add new fields in this class.
> }
>
> OffsetEpochs offsetEpochs(TopicPartition);
>
> void seek(TopicPartition, OffsetEpochs);
> ``
>
>
> Thanks,
> Dong
>
>
> On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson 
> wrote:
>
> > Hey Dong,
> 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-30 Thread Dong Lin
Hey Jason,

Regarding seek(...), it seems that we want an API for user to initialize
consumer with (offset, leaderEpoch) and that API should allow throwing
PartitionTruncationException. Suppose we agree on this, then
seekToNearest() is not sufficient because it will always swallow
PartitionTruncationException. Here we have two options. The first option is
to add API offsetsForLeaderEpochs() to translate (leaderEpoch, offset) to
offset. The second option is to have add seek(offset, leaderEpoch). It
seems that second option may be more simpler because it makes it clear that
(offset, leaderEpoch) will be used to identify consumer's position in a
partition. And user only needs to handle PartitionTruncationException from
the poll(). In comparison the first option seems a bit harder to use
because user have to also handle the PartitionTruncationException if
offsetsForLeaderEpochs() returns different offset from user-provided
offset. What do you think?

If we decide to add API seek(offset, leaderEpoch), then we can decide
whether and how to add API to translate (offset, leaderEpoch) to offset. It
seems that this API will be needed by advanced user to don't want auto
offset reset (so that it can be notified) but still wants to reset offset
to closest. For those users if probably makes sense to only have the API in
AdminClient. offsetsForTimes() seems like a common API that will be needed
by user's of consumer in general, so it may be more reasonable to stay in
the consumer API. I don't have a strong opinion on whether
offsetsForTimes() should be replaced by API in AdminClient.

Though (offset, leaderEpoch) is needed to uniquely identify a message in
general, it is only needed for advanced users who has turned on unclean
leader election, need to use seek(..), and don't want auto offset reset.
Most other users probably just want to enable auto offset reset and store
offset in Kafka. Thus we might want to keep the existing offset-only APIs
(e.g. seek() and position()) for most users while adding new APIs for
advanced users. And yes, it seems that we need new name for position().

Though I think we need new APIs to carry the new information (e.g.
leaderEpoch), I am not very sure how that should look like. One possible
option is those APIs in KIP-232. Another option is something like this:

`
class OffsetEpochs {
  long offset;
  int leaderEpoch;
  int partitionEpoch;   // This may be needed later as discussed in KIP-232
  ... // Hopefully these are all we need to identify message in Kafka. But
if we need more then we can add new fields in this class.
}

OffsetEpochs offsetEpochs(TopicPartition);

void seek(TopicPartition, OffsetEpochs);
``


Thanks,
Dong


On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson 
wrote:

> Hey Dong,
>
> Thanks for the feedback. The first three points are easy:
>
> 1. Yes, we should be consistent.
> 2. Yes, I will add this.
> 3. Yes, I think we should document the changes to the committed offset
> schema. I meant to do this, but it slipped my mind.
>
> The latter questions are tougher. One option I was considering is to have
> only `offsetsForLeaderEpochs` exposed from the consumer and to drop the new
> seek() API. That seems more consistent with the current use of
> `offsetsForTimes` (we don't have a separate `seekToTimestamp` API). An
> alternative might be to take a page from the AdminClient API and add a new
> method to generalize offset lookup. For example, we could have
> `lookupOffsets(LookupOptions)`. We could then deprecate `offsetsForTimes`
> and this would open the door for future extensions without needing new
> APIs.
>
> The case of position() is a little more annoying. It would have been better
> had we let this return an object so that it is easier to extend. This is
> the only reason I didn't add the API to the KIP. Maybe we should bite the
> bullet and fix this now? Unfortunately we'll have to come up with a new
> name. Maybe `currentPosition`?
>
> Thoughts?
>
> -Jason
>
>
> On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin  wrote:
>
> > Regarding points 4) and 5) above, motivation for the alternative APIs is
> > that, if we decide that leaderEpoch is equally important as offset in
> > identifying a message, then it may be reasonable to always specify it
> > wherever offset is currently required in the consumer API to identify a
> > message, e.g. position(), seek(). For example, since we allow user to
> > retrieve offset using position() instead of asking user to keep track of
> > the offset of the latest ConsumerRecord, may be it will be more
> consistent
> > for user to also retrieve  leaderEpoch using position()?
> >
> >
> >
> > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin  wrote:
> >
> > > Hey Jason,
> > >
> > > Thanks for the update. It looks pretty good. Just some minor comments
> > > below:
> > >
> > > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception
> > TruncatedPartitionException.
> > > Can we make the name more consistent, e.g. 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-29 Thread Jason Gustafson
Hey Dong,

Thanks for the feedback. The first three points are easy:

1. Yes, we should be consistent.
2. Yes, I will add this.
3. Yes, I think we should document the changes to the committed offset
schema. I meant to do this, but it slipped my mind.

The latter questions are tougher. One option I was considering is to have
only `offsetsForLeaderEpochs` exposed from the consumer and to drop the new
seek() API. That seems more consistent with the current use of
`offsetsForTimes` (we don't have a separate `seekToTimestamp` API). An
alternative might be to take a page from the AdminClient API and add a new
method to generalize offset lookup. For example, we could have
`lookupOffsets(LookupOptions)`. We could then deprecate `offsetsForTimes`
and this would open the door for future extensions without needing new APIs.

The case of position() is a little more annoying. It would have been better
had we let this return an object so that it is easier to extend. This is
the only reason I didn't add the API to the KIP. Maybe we should bite the
bullet and fix this now? Unfortunately we'll have to come up with a new
name. Maybe `currentPosition`?

Thoughts?

-Jason


On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin  wrote:

> Regarding points 4) and 5) above, motivation for the alternative APIs is
> that, if we decide that leaderEpoch is equally important as offset in
> identifying a message, then it may be reasonable to always specify it
> wherever offset is currently required in the consumer API to identify a
> message, e.g. position(), seek(). For example, since we allow user to
> retrieve offset using position() instead of asking user to keep track of
> the offset of the latest ConsumerRecord, may be it will be more consistent
> for user to also retrieve  leaderEpoch using position()?
>
>
>
> On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks for the update. It looks pretty good. Just some minor comments
> > below:
> >
> > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception
> TruncatedPartitionException.
> > Can we make the name more consistent, e.g. LogTruncationException?
> >
> > 2) Do we need to add UnknownLeaderEpochException as part of API change?
> >
> > 3) Not sure if the offset topic schema is also public API. If so, maybe
> we
> > should also include the schema change in the API?
> >
> > 4) For users who store offset externally, currently they get offset using
> > position(..), store the offset externally, and use seek(..) to initialize
> > the consumer next time. After this KIP they will need to store and use
> the
> > leaderEpoch together with the offset. Should we also update the API so
> that
> > user can also get leaderEpoch from position(...)? Not sure if it is OK to
> > ask user to track the latest leaderEpoch of ConsumerRecord by themselves.
> >
> > 5) Also for users who store offset externally, they need to call seek(..)
> > with leaderEpoch to initialize consumer. With current KIP users need to
> > call seekToNearest(), whose name suggests that the final position may be
> > different from what was requested. However, if users may want to avoid
> auto
> > offset reset and be notified explicitly when there is log truncation,
> then seekToNearest()
> > probably does not help here. Would it make sense to replace
> seekToNearest()
> > with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)?
> >
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson 
> > wrote:
> >
> >> Hey Guozhang,
> >>
> >> That's fair. In fact, perhaps we do not need this API at all. We already
> >> have the new seek() in this KIP which can do the lookup based on epoch
> for
> >> this use case. I guess we should probably call it seekToNearest() though
> >> to
> >> make it clear that the final position may be different from what was
> >> requested.
> >>
> >> Thanks,
> >> Jason
> >>
> >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang 
> >> wrote:
> >>
> >> > Hi Jason,
> >> >
> >> > I think it is less worthwhile to add KafkaConsumer#offsetsForLeader
> >> Epochs,
> >> > since probably only very advanced users are aware of the leaderEpoch,
> >> and
> >> > hence ever care to use it anyways. It is more like an admin client
> >> > operation than a consumer client operation: if the motivation is to
> >> > facility customized reset policy, maybe adding it as
> >> > AdminClient#offsetsForLeaderEpochs
> >> > is better as it is not an aggressive assumption that for such advanced
> >> > users they are willing to use some admin client to get further
> >> information?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
> >> > wrote:
> >> >
> >> > > Thanks for the feedback. I've updated the KIP. Specifically I
> removed
> >> the
> >> > > "closest" reset option and the proposal to reset by timestamp when
> the
> >> > > precise truncation point cannot be determined. Instead, I proposed
> >> that
> >> > we
> >> > > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-29 Thread Dong Lin
Regarding points 4) and 5) above, motivation for the alternative APIs is
that, if we decide that leaderEpoch is equally important as offset in
identifying a message, then it may be reasonable to always specify it
wherever offset is currently required in the consumer API to identify a
message, e.g. position(), seek(). For example, since we allow user to
retrieve offset using position() instead of asking user to keep track of
the offset of the latest ConsumerRecord, may be it will be more consistent
for user to also retrieve  leaderEpoch using position()?



On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the update. It looks pretty good. Just some minor comments
> below:
>
> 1) The KIP adds new error code "LOG_TRUNCATION" and new exception 
> TruncatedPartitionException.
> Can we make the name more consistent, e.g. LogTruncationException?
>
> 2) Do we need to add UnknownLeaderEpochException as part of API change?
>
> 3) Not sure if the offset topic schema is also public API. If so, maybe we
> should also include the schema change in the API?
>
> 4) For users who store offset externally, currently they get offset using
> position(..), store the offset externally, and use seek(..) to initialize
> the consumer next time. After this KIP they will need to store and use the
> leaderEpoch together with the offset. Should we also update the API so that
> user can also get leaderEpoch from position(...)? Not sure if it is OK to
> ask user to track the latest leaderEpoch of ConsumerRecord by themselves.
>
> 5) Also for users who store offset externally, they need to call seek(..)
> with leaderEpoch to initialize consumer. With current KIP users need to
> call seekToNearest(), whose name suggests that the final position may be
> different from what was requested. However, if users may want to avoid auto
> offset reset and be notified explicitly when there is log truncation, then 
> seekToNearest()
> probably does not help here. Would it make sense to replace seekToNearest()
> with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)?
>
>
> Thanks,
> Dong
>
>
> On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson 
> wrote:
>
>> Hey Guozhang,
>>
>> That's fair. In fact, perhaps we do not need this API at all. We already
>> have the new seek() in this KIP which can do the lookup based on epoch for
>> this use case. I guess we should probably call it seekToNearest() though
>> to
>> make it clear that the final position may be different from what was
>> requested.
>>
>> Thanks,
>> Jason
>>
>> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang 
>> wrote:
>>
>> > Hi Jason,
>> >
>> > I think it is less worthwhile to add KafkaConsumer#offsetsForLeader
>> Epochs,
>> > since probably only very advanced users are aware of the leaderEpoch,
>> and
>> > hence ever care to use it anyways. It is more like an admin client
>> > operation than a consumer client operation: if the motivation is to
>> > facility customized reset policy, maybe adding it as
>> > AdminClient#offsetsForLeaderEpochs
>> > is better as it is not an aggressive assumption that for such advanced
>> > users they are willing to use some admin client to get further
>> information?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
>> > wrote:
>> >
>> > > Thanks for the feedback. I've updated the KIP. Specifically I removed
>> the
>> > > "closest" reset option and the proposal to reset by timestamp when the
>> > > precise truncation point cannot be determined. Instead, I proposed
>> that
>> > we
>> > > always reset using the nearest epoch when a reset policy is defined
>> > (either
>> > > "earliest" or "latest"). Does that sound reasonable?
>> > >
>> > > One thing I am still debating is whether it would be better to have a
>> > > separate API to find the closest offset using the leader epoch. In the
>> > > current KIP, I suggested to piggyback this information on an
>> exception,
>> > but
>> > > I'm beginning to think it would be better not to hide the lookup. It
>> is
>> > > awkward to implement since it means delaying the exception and the API
>> > may
>> > > actually be useful when customizing reset logic if no auto reset
>> policy
>> > is
>> > > defined. I was thinking we can add an API like the following:
>> > >
>> > > Map
>> > > offsetsForLeaderEpochs(Map epochsToSearch)
>> > >
>> > > Thoughts?
>> > >
>> > > -Jason
>> > >
>> > >
>> > >
>> > >
>> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson > >
>> > > wrote:
>> > >
>> > > > @Dong
>> > > >
>> > > > Those are fair points. Both approaches require some fuzziness to
>> reset
>> > > the
>> > > > offset in these pathological scenarios and we cannot guarantee
>> > > > at-least-once delivery either way unless we have the full history of
>> > > leader
>> > > > epochs that were consumed. The KIP-101 logic may actually be more
>> > > accurate
>> > > > than using timestamps because it does not depend on the messages
>> which
>> > > are
>> > > > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-29 Thread Dong Lin
Hey Jason,

Thanks for the update. It looks pretty good. Just some minor comments below:

1) The KIP adds new error code "LOG_TRUNCATION" and new
exception TruncatedPartitionException. Can we make the name more
consistent, e.g. LogTruncationException?

2) Do we need to add UnknownLeaderEpochException as part of API change?

3) Not sure if the offset topic schema is also public API. If so, maybe we
should also include the schema change in the API?

4) For users who store offset externally, currently they get offset using
position(..), store the offset externally, and use seek(..) to initialize
the consumer next time. After this KIP they will need to store and use the
leaderEpoch together with the offset. Should we also update the API so that
user can also get leaderEpoch from position(...)? Not sure if it is OK to
ask user to track the latest leaderEpoch of ConsumerRecord by themselves.

5) Also for users who store offset externally, they need to call seek(..)
with leaderEpoch to initialize consumer. With current KIP users need to
call seekToNearest(), whose name suggests that the final position may be
different from what was requested. However, if users may want to avoid auto
offset reset and be notified explicitly when there is log truncation,
then seekToNearest()
probably does not help here. Would it make sense to replace seekToNearest()
with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)?


Thanks,
Dong


On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson  wrote:

> Hey Guozhang,
>
> That's fair. In fact, perhaps we do not need this API at all. We already
> have the new seek() in this KIP which can do the lookup based on epoch for
> this use case. I guess we should probably call it seekToNearest() though to
> make it clear that the final position may be different from what was
> requested.
>
> Thanks,
> Jason
>
> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > I think it is less worthwhile to add KafkaConsumer#
> offsetsForLeaderEpochs,
> > since probably only very advanced users are aware of the leaderEpoch, and
> > hence ever care to use it anyways. It is more like an admin client
> > operation than a consumer client operation: if the motivation is to
> > facility customized reset policy, maybe adding it as
> > AdminClient#offsetsForLeaderEpochs
> > is better as it is not an aggressive assumption that for such advanced
> > users they are willing to use some admin client to get further
> information?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
> > wrote:
> >
> > > Thanks for the feedback. I've updated the KIP. Specifically I removed
> the
> > > "closest" reset option and the proposal to reset by timestamp when the
> > > precise truncation point cannot be determined. Instead, I proposed that
> > we
> > > always reset using the nearest epoch when a reset policy is defined
> > (either
> > > "earliest" or "latest"). Does that sound reasonable?
> > >
> > > One thing I am still debating is whether it would be better to have a
> > > separate API to find the closest offset using the leader epoch. In the
> > > current KIP, I suggested to piggyback this information on an exception,
> > but
> > > I'm beginning to think it would be better not to hide the lookup. It is
> > > awkward to implement since it means delaying the exception and the API
> > may
> > > actually be useful when customizing reset logic if no auto reset policy
> > is
> > > defined. I was thinking we can add an API like the following:
> > >
> > > Map
> > > offsetsForLeaderEpochs(Map epochsToSearch)
> > >
> > > Thoughts?
> > >
> > > -Jason
> > >
> > >
> > >
> > >
> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> > > wrote:
> > >
> > > > @Dong
> > > >
> > > > Those are fair points. Both approaches require some fuzziness to
> reset
> > > the
> > > > offset in these pathological scenarios and we cannot guarantee
> > > > at-least-once delivery either way unless we have the full history of
> > > leader
> > > > epochs that were consumed. The KIP-101 logic may actually be more
> > > accurate
> > > > than using timestamps because it does not depend on the messages
> which
> > > are
> > > > written after the unclean leader election. The case we're talking
> about
> > > > should be extremely rare in practice anyway. I also agree that we may
> > not
> > > > want to add new machinery if it only helps the old message format.
> Ok,
> > > > let's go ahead and drop the timestamp.
> > > >
> > > > @Guozhang
> > > >
> > > > * My current understanding is that, with unclean leader election
> turned
> > > on,
> > > >> exactly-once is out of the window since we cannot guarantee that all
> > > >> committed message markers will not be lost. And hence there is no
> need
> > > to
> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes
> with
> > > >> read.committed turned on. Is that right?
> > > >
> > > >
> > > > Yes, that's right. EoS and unclean leader 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Sounds good to me.

On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson  wrote:

> Hey Guozhang,
>
> That's fair. In fact, perhaps we do not need this API at all. We already
> have the new seek() in this KIP which can do the lookup based on epoch for
> this use case. I guess we should probably call it seekToNearest() though to
> make it clear that the final position may be different from what was
> requested.
>
> Thanks,
> Jason
>
> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > I think it is less worthwhile to add KafkaConsumer#
> offsetsForLeaderEpochs,
> > since probably only very advanced users are aware of the leaderEpoch, and
> > hence ever care to use it anyways. It is more like an admin client
> > operation than a consumer client operation: if the motivation is to
> > facility customized reset policy, maybe adding it as
> > AdminClient#offsetsForLeaderEpochs
> > is better as it is not an aggressive assumption that for such advanced
> > users they are willing to use some admin client to get further
> information?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
> > wrote:
> >
> > > Thanks for the feedback. I've updated the KIP. Specifically I removed
> the
> > > "closest" reset option and the proposal to reset by timestamp when the
> > > precise truncation point cannot be determined. Instead, I proposed that
> > we
> > > always reset using the nearest epoch when a reset policy is defined
> > (either
> > > "earliest" or "latest"). Does that sound reasonable?
> > >
> > > One thing I am still debating is whether it would be better to have a
> > > separate API to find the closest offset using the leader epoch. In the
> > > current KIP, I suggested to piggyback this information on an exception,
> > but
> > > I'm beginning to think it would be better not to hide the lookup. It is
> > > awkward to implement since it means delaying the exception and the API
> > may
> > > actually be useful when customizing reset logic if no auto reset policy
> > is
> > > defined. I was thinking we can add an API like the following:
> > >
> > > Map
> > > offsetsForLeaderEpochs(Map epochsToSearch)
> > >
> > > Thoughts?
> > >
> > > -Jason
> > >
> > >
> > >
> > >
> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> > > wrote:
> > >
> > > > @Dong
> > > >
> > > > Those are fair points. Both approaches require some fuzziness to
> reset
> > > the
> > > > offset in these pathological scenarios and we cannot guarantee
> > > > at-least-once delivery either way unless we have the full history of
> > > leader
> > > > epochs that were consumed. The KIP-101 logic may actually be more
> > > accurate
> > > > than using timestamps because it does not depend on the messages
> which
> > > are
> > > > written after the unclean leader election. The case we're talking
> about
> > > > should be extremely rare in practice anyway. I also agree that we may
> > not
> > > > want to add new machinery if it only helps the old message format.
> Ok,
> > > > let's go ahead and drop the timestamp.
> > > >
> > > > @Guozhang
> > > >
> > > > * My current understanding is that, with unclean leader election
> turned
> > > on,
> > > >> exactly-once is out of the window since we cannot guarantee that all
> > > >> committed message markers will not be lost. And hence there is no
> need
> > > to
> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes
> with
> > > >> read.committed turned on. Is that right?
> > > >
> > > >
> > > > Yes, that's right. EoS and unclean leader election don't mix well. It
> > may
> > > > be worth considering separately whether we should try to reconcile
> the
> > > > transaction log following an unclean leader election. At least we may
> > be
> > > > able to prevent dangling transactions from blocking consumers. This
> KIP
> > > > does not address this problem.
> > > >
> > > > * MINOR: "if the epoch is greater than the minimum expected epoch,
> that
> > > the
> > > >> new epoch does not begin at an earlier offset than the fetch offset.
> > In
> > > >> the latter case, the leader can respond with a new LOG_TRUNCATION
> > error
> > > >> code" should it be "does not begin at a later offset than the fetch
> > > >> offset"?
> > > >
> > > >
> > > > I think the comment is correct, though the phrasing may be confusing.
> > We
> > > > know truncation has occurred if there exists a larger epoch with a
> > > starting
> > > > offset that is lower than the fetch offset. Let me try to rephrase
> > this.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > >> Jason, thanks for the KIP. A few comments:
> > > >>
> > > >> * I think Dong's question about whether to use timestamp-based
> > approach
> > > >> v.s. start-offset-of-first-larger-epoch is valid; more
> specifically,
> > > with
> > > >> timestamp-based approach we may still be reseting to an offset
> falling
> > > >> into
> > > >> the 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
Hey Guozhang,

That's fair. In fact, perhaps we do not need this API at all. We already
have the new seek() in this KIP which can do the lookup based on epoch for
this use case. I guess we should probably call it seekToNearest() though to
make it clear that the final position may be different from what was
requested.

Thanks,
Jason

On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang  wrote:

> Hi Jason,
>
> I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs,
> since probably only very advanced users are aware of the leaderEpoch, and
> hence ever care to use it anyways. It is more like an admin client
> operation than a consumer client operation: if the motivation is to
> facility customized reset policy, maybe adding it as
> AdminClient#offsetsForLeaderEpochs
> is better as it is not an aggressive assumption that for such advanced
> users they are willing to use some admin client to get further information?
>
>
> Guozhang
>
>
> On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson 
> wrote:
>
> > Thanks for the feedback. I've updated the KIP. Specifically I removed the
> > "closest" reset option and the proposal to reset by timestamp when the
> > precise truncation point cannot be determined. Instead, I proposed that
> we
> > always reset using the nearest epoch when a reset policy is defined
> (either
> > "earliest" or "latest"). Does that sound reasonable?
> >
> > One thing I am still debating is whether it would be better to have a
> > separate API to find the closest offset using the leader epoch. In the
> > current KIP, I suggested to piggyback this information on an exception,
> but
> > I'm beginning to think it would be better not to hide the lookup. It is
> > awkward to implement since it means delaying the exception and the API
> may
> > actually be useful when customizing reset logic if no auto reset policy
> is
> > defined. I was thinking we can add an API like the following:
> >
> > Map
> > offsetsForLeaderEpochs(Map epochsToSearch)
> >
> > Thoughts?
> >
> > -Jason
> >
> >
> >
> >
> > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> > wrote:
> >
> > > @Dong
> > >
> > > Those are fair points. Both approaches require some fuzziness to reset
> > the
> > > offset in these pathological scenarios and we cannot guarantee
> > > at-least-once delivery either way unless we have the full history of
> > leader
> > > epochs that were consumed. The KIP-101 logic may actually be more
> > accurate
> > > than using timestamps because it does not depend on the messages which
> > are
> > > written after the unclean leader election. The case we're talking about
> > > should be extremely rare in practice anyway. I also agree that we may
> not
> > > want to add new machinery if it only helps the old message format. Ok,
> > > let's go ahead and drop the timestamp.
> > >
> > > @Guozhang
> > >
> > > * My current understanding is that, with unclean leader election turned
> > on,
> > >> exactly-once is out of the window since we cannot guarantee that all
> > >> committed message markers will not be lost. And hence there is no need
> > to
> > >> have special handling logic for LOG_TRUNCATED or OOR error codes with
> > >> read.committed turned on. Is that right?
> > >
> > >
> > > Yes, that's right. EoS and unclean leader election don't mix well. It
> may
> > > be worth considering separately whether we should try to reconcile the
> > > transaction log following an unclean leader election. At least we may
> be
> > > able to prevent dangling transactions from blocking consumers. This KIP
> > > does not address this problem.
> > >
> > > * MINOR: "if the epoch is greater than the minimum expected epoch, that
> > the
> > >> new epoch does not begin at an earlier offset than the fetch offset.
> In
> > >> the latter case, the leader can respond with a new LOG_TRUNCATION
> error
> > >> code" should it be "does not begin at a later offset than the fetch
> > >> offset"?
> > >
> > >
> > > I think the comment is correct, though the phrasing may be confusing.
> We
> > > know truncation has occurred if there exists a larger epoch with a
> > starting
> > > offset that is lower than the fetch offset. Let me try to rephrase
> this.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang 
> > wrote:
> > >
> > >> Jason, thanks for the KIP. A few comments:
> > >>
> > >> * I think Dong's question about whether to use timestamp-based
> approach
> > >> v.s. start-offset-of-first-larger-epoch is valid; more specifically,
> > with
> > >> timestamp-based approach we may still be reseting to an offset falling
> > >> into
> > >> the truncated interval, and hence we may still miss some data, i.e.
> not
> > >> guaranteeing at-least-once still. With the
> > >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee
> > no
> > >> valid data is missed when we have consecutive log truncations (maybe
> we
> > >> need to look back into details of KIP-101 to figure it out). If the
> > latter

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Hi Jason,

I think it is less worthwhile to add KafkaConsumer#offsetsForLeaderEpochs,
since probably only very advanced users are aware of the leaderEpoch, and
hence ever care to use it anyways. It is more like an admin client
operation than a consumer client operation: if the motivation is to
facility customized reset policy, maybe adding it as
AdminClient#offsetsForLeaderEpochs
is better as it is not an aggressive assumption that for such advanced
users they are willing to use some admin client to get further information?


Guozhang


On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson  wrote:

> Thanks for the feedback. I've updated the KIP. Specifically I removed the
> "closest" reset option and the proposal to reset by timestamp when the
> precise truncation point cannot be determined. Instead, I proposed that we
> always reset using the nearest epoch when a reset policy is defined (either
> "earliest" or "latest"). Does that sound reasonable?
>
> One thing I am still debating is whether it would be better to have a
> separate API to find the closest offset using the leader epoch. In the
> current KIP, I suggested to piggyback this information on an exception, but
> I'm beginning to think it would be better not to hide the lookup. It is
> awkward to implement since it means delaying the exception and the API may
> actually be useful when customizing reset logic if no auto reset policy is
> defined. I was thinking we can add an API like the following:
>
> Map
> offsetsForLeaderEpochs(Map epochsToSearch)
>
> Thoughts?
>
> -Jason
>
>
>
>
> On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
> wrote:
>
> > @Dong
> >
> > Those are fair points. Both approaches require some fuzziness to reset
> the
> > offset in these pathological scenarios and we cannot guarantee
> > at-least-once delivery either way unless we have the full history of
> leader
> > epochs that were consumed. The KIP-101 logic may actually be more
> accurate
> > than using timestamps because it does not depend on the messages which
> are
> > written after the unclean leader election. The case we're talking about
> > should be extremely rare in practice anyway. I also agree that we may not
> > want to add new machinery if it only helps the old message format. Ok,
> > let's go ahead and drop the timestamp.
> >
> > @Guozhang
> >
> > * My current understanding is that, with unclean leader election turned
> on,
> >> exactly-once is out of the window since we cannot guarantee that all
> >> committed message markers will not be lost. And hence there is no need
> to
> >> have special handling logic for LOG_TRUNCATED or OOR error codes with
> >> read.committed turned on. Is that right?
> >
> >
> > Yes, that's right. EoS and unclean leader election don't mix well. It may
> > be worth considering separately whether we should try to reconcile the
> > transaction log following an unclean leader election. At least we may be
> > able to prevent dangling transactions from blocking consumers. This KIP
> > does not address this problem.
> >
> > * MINOR: "if the epoch is greater than the minimum expected epoch, that
> the
> >> new epoch does not begin at an earlier offset than the fetch offset.  In
> >> the latter case, the leader can respond with a new LOG_TRUNCATION error
> >> code" should it be "does not begin at a later offset than the fetch
> >> offset"?
> >
> >
> > I think the comment is correct, though the phrasing may be confusing. We
> > know truncation has occurred if there exists a larger epoch with a
> starting
> > offset that is lower than the fetch offset. Let me try to rephrase this.
> >
> > Thanks,
> > Jason
> >
> > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang 
> wrote:
> >
> >> Jason, thanks for the KIP. A few comments:
> >>
> >> * I think Dong's question about whether to use timestamp-based approach
> >> v.s. start-offset-of-first-larger-epoch is valid; more specifically,
> with
> >> timestamp-based approach we may still be reseting to an offset falling
> >> into
> >> the truncated interval, and hence we may still miss some data, i.e. not
> >> guaranteeing at-least-once still. With the
> >> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee
> no
> >> valid data is missed when we have consecutive log truncations (maybe we
> >> need to look back into details of KIP-101 to figure it out). If the
> latter
> >> can indeed guarantee at least once, we could consider using that
> approach.
> >>
> >> * My current understanding is that, with unclean leader election turned
> >> on,
> >> exactly-once is out of the window since we cannot guarantee that all
> >> committed message markers will not be lost. And hence there is no need
> to
> >> have special handling logic for LOG_TRUNCATED or OOR error codes with
> >> read.committed turned on. Is that right?
> >>
> >> * MINOR: "if the epoch is greater than the minimum expected epoch, that
> >> the
> >> new epoch does not begin at an earlier offset than the fetch offset.  In
> >> the latter 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
Thanks for the feedback. I've updated the KIP. Specifically I removed the
"closest" reset option and the proposal to reset by timestamp when the
precise truncation point cannot be determined. Instead, I proposed that we
always reset using the nearest epoch when a reset policy is defined (either
"earliest" or "latest"). Does that sound reasonable?

One thing I am still debating is whether it would be better to have a
separate API to find the closest offset using the leader epoch. In the
current KIP, I suggested to piggyback this information on an exception, but
I'm beginning to think it would be better not to hide the lookup. It is
awkward to implement since it means delaying the exception and the API may
actually be useful when customizing reset logic if no auto reset policy is
defined. I was thinking we can add an API like the following:

Map
offsetsForLeaderEpochs(Map epochsToSearch)

Thoughts?

-Jason




On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson 
wrote:

> @Dong
>
> Those are fair points. Both approaches require some fuzziness to reset the
> offset in these pathological scenarios and we cannot guarantee
> at-least-once delivery either way unless we have the full history of leader
> epochs that were consumed. The KIP-101 logic may actually be more accurate
> than using timestamps because it does not depend on the messages which are
> written after the unclean leader election. The case we're talking about
> should be extremely rare in practice anyway. I also agree that we may not
> want to add new machinery if it only helps the old message format. Ok,
> let's go ahead and drop the timestamp.
>
> @Guozhang
>
> * My current understanding is that, with unclean leader election turned on,
>> exactly-once is out of the window since we cannot guarantee that all
>> committed message markers will not be lost. And hence there is no need to
>> have special handling logic for LOG_TRUNCATED or OOR error codes with
>> read.committed turned on. Is that right?
>
>
> Yes, that's right. EoS and unclean leader election don't mix well. It may
> be worth considering separately whether we should try to reconcile the
> transaction log following an unclean leader election. At least we may be
> able to prevent dangling transactions from blocking consumers. This KIP
> does not address this problem.
>
> * MINOR: "if the epoch is greater than the minimum expected epoch, that the
>> new epoch does not begin at an earlier offset than the fetch offset.  In
>> the latter case, the leader can respond with a new LOG_TRUNCATION error
>> code" should it be "does not begin at a later offset than the fetch
>> offset"?
>
>
> I think the comment is correct, though the phrasing may be confusing. We
> know truncation has occurred if there exists a larger epoch with a starting
> offset that is lower than the fetch offset. Let me try to rephrase this.
>
> Thanks,
> Jason
>
> On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang  wrote:
>
>> Jason, thanks for the KIP. A few comments:
>>
>> * I think Dong's question about whether to use timestamp-based approach
>> v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
>> timestamp-based approach we may still be reseting to an offset falling
>> into
>> the truncated interval, and hence we may still miss some data, i.e. not
>> guaranteeing at-least-once still. With the
>> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no
>> valid data is missed when we have consecutive log truncations (maybe we
>> need to look back into details of KIP-101 to figure it out). If the latter
>> can indeed guarantee at least once, we could consider using that approach.
>>
>> * My current understanding is that, with unclean leader election turned
>> on,
>> exactly-once is out of the window since we cannot guarantee that all
>> committed message markers will not be lost. And hence there is no need to
>> have special handling logic for LOG_TRUNCATED or OOR error codes with
>> read.committed turned on. Is that right?
>>
>> * MINOR: "if the epoch is greater than the minimum expected epoch, that
>> the
>> new epoch does not begin at an earlier offset than the fetch offset.  In
>> the latter case, the leader can respond with a new LOG_TRUNCATION error
>> code" should it be "does not begin at a later offset than the fetch
>> offset"?
>>
>>
>>
>> Guozhang
>>
>>
>> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin  wrote:
>>
>> > Hey Jason,
>> >
>> > Thanks for the explanation.
>> >
>> > Please correct me if this is wrong. The "unknown truncation offset"
>> > scenario happens when consumer does not have the full leaderEpoch ->
>> offset
>> > mapping. In this case we can still use the KIP-101-based approach to
>> > truncate offset to "start offset of the first Leader Epoch larger than
>> last
>> > epoch of the consumer" but it may be inaccurate. So the KIP chooses to
>> use
>> > the timestamp-based approach which is also best-effort.
>> >
>> > If this understanding is correct, for "closest" offset 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Jason Gustafson
@Dong

Those are fair points. Both approaches require some fuzziness to reset the
offset in these pathological scenarios and we cannot guarantee
at-least-once delivery either way unless we have the full history of leader
epochs that were consumed. The KIP-101 logic may actually be more accurate
than using timestamps because it does not depend on the messages which are
written after the unclean leader election. The case we're talking about
should be extremely rare in practice anyway. I also agree that we may not
want to add new machinery if it only helps the old message format. Ok,
let's go ahead and drop the timestamp.

@Guozhang

* My current understanding is that, with unclean leader election turned on,
> exactly-once is out of the window since we cannot guarantee that all
> committed message markers will not be lost. And hence there is no need to
> have special handling logic for LOG_TRUNCATED or OOR error codes with
> read.committed turned on. Is that right?


Yes, that's right. EoS and unclean leader election don't mix well. It may
be worth considering separately whether we should try to reconcile the
transaction log following an unclean leader election. At least we may be
able to prevent dangling transactions from blocking consumers. This KIP
does not address this problem.

* MINOR: "if the epoch is greater than the minimum expected epoch, that the
> new epoch does not begin at an earlier offset than the fetch offset.  In
> the latter case, the leader can respond with a new LOG_TRUNCATION error
> code" should it be "does not begin at a later offset than the fetch
> offset"?


I think the comment is correct, though the phrasing may be confusing. We
know truncation has occurred if there exists a larger epoch with a starting
offset that is lower than the fetch offset. Let me try to rephrase this.

Thanks,
Jason

On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang  wrote:

> Jason, thanks for the KIP. A few comments:
>
> * I think Dong's question about whether to use timestamp-based approach
> v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
> timestamp-based approach we may still be reseting to an offset falling into
> the truncated interval, and hence we may still miss some data, i.e. not
> guaranteeing at-least-once still. With the
> start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no
> valid data is missed when we have consecutive log truncations (maybe we
> need to look back into details of KIP-101 to figure it out). If the latter
> can indeed guarantee at least once, we could consider using that approach.
>
> * My current understanding is that, with unclean leader election turned on,
> exactly-once is out of the window since we cannot guarantee that all
> committed message markers will not be lost. And hence there is no need to
> have special handling logic for LOG_TRUNCATED or OOR error codes with
> read.committed turned on. Is that right?
>
> * MINOR: "if the epoch is greater than the minimum expected epoch, that the
> new epoch does not begin at an earlier offset than the fetch offset.  In
> the latter case, the leader can respond with a new LOG_TRUNCATION error
> code" should it be "does not begin at a later offset than the fetch
> offset"?
>
>
>
> Guozhang
>
>
> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin  wrote:
>
> > Hey Jason,
> >
> > Thanks for the explanation.
> >
> > Please correct me if this is wrong. The "unknown truncation offset"
> > scenario happens when consumer does not have the full leaderEpoch ->
> offset
> > mapping. In this case we can still use the KIP-101-based approach to
> > truncate offset to "start offset of the first Leader Epoch larger than
> last
> > epoch of the consumer" but it may be inaccurate. So the KIP chooses to
> use
> > the timestamp-based approach which is also best-effort.
> >
> > If this understanding is correct, for "closest" offset reset policy and
> > "unknown truncation offset" scenario, I am wondering whether it maybe
> > better to replace timestamp-based approach with KIP-101 based approach.
> In
> > comparison to timestamp-based approach, the KIP-101-based approach seems
> to
> > simplify the API a bit since user does not need to understand timestamp.
> > Similar to the timestamp-based approach, both approaches are best-effort
> > and do not guarantee that consumer can consume all messages. It is not
> like
> > KIP-279 which guarantees that follower broker can consume all messages
> from
> > the leader.
> >
> > Then it seems that the remaining difference is mostly about accuracy,
> i.e.
> > how much message will be duplicated or missed in the "unknown truncation
> > offset" scenario. Not sure either one is clearly better than the other.
> > Note that there are two scenarios mentioned in KIP-279 which are not
> > addressed by KIP-101. Both scenarios require quick leadership change
> > between brokers, which seems to suggest that the offset based obtained
> > by "start
> > offset of the first Leader Epoch 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-27 Thread Guozhang Wang
Jason, thanks for the KIP. A few comments:

* I think Dong's question about whether to use timestamp-based approach
v.s. start-offset-of-first-larger-epoch is valid; more specifically, with
timestamp-based approach we may still be reseting to an offset falling into
the truncated interval, and hence we may still miss some data, i.e. not
guaranteeing at-least-once still. With the
start-offset-of-first-larger-epoch, I'm not sure if it will guarantee no
valid data is missed when we have consecutive log truncations (maybe we
need to look back into details of KIP-101 to figure it out). If the latter
can indeed guarantee at least once, we could consider using that approach.

* My current understanding is that, with unclean leader election turned on,
exactly-once is out of the window since we cannot guarantee that all
committed message markers will not be lost. And hence there is no need to
have special handling logic for LOG_TRUNCATED or OOR error codes with
read.committed turned on. Is that right?

* MINOR: "if the epoch is greater than the minimum expected epoch, that the
new epoch does not begin at an earlier offset than the fetch offset.  In
the latter case, the leader can respond with a new LOG_TRUNCATION error
code" should it be "does not begin at a later offset than the fetch offset"?



Guozhang


On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the explanation.
>
> Please correct me if this is wrong. The "unknown truncation offset"
> scenario happens when consumer does not have the full leaderEpoch -> offset
> mapping. In this case we can still use the KIP-101-based approach to
> truncate offset to "start offset of the first Leader Epoch larger than last
> epoch of the consumer" but it may be inaccurate. So the KIP chooses to use
> the timestamp-based approach which is also best-effort.
>
> If this understanding is correct, for "closest" offset reset policy and
> "unknown truncation offset" scenario, I am wondering whether it maybe
> better to replace timestamp-based approach with KIP-101 based approach. In
> comparison to timestamp-based approach, the KIP-101-based approach seems to
> simplify the API a bit since user does not need to understand timestamp.
> Similar to the timestamp-based approach, both approaches are best-effort
> and do not guarantee that consumer can consume all messages. It is not like
> KIP-279 which guarantees that follower broker can consume all messages from
> the leader.
>
> Then it seems that the remaining difference is mostly about accuracy, i.e.
> how much message will be duplicated or missed in the "unknown truncation
> offset" scenario. Not sure either one is clearly better than the other.
> Note that there are two scenarios mentioned in KIP-279 which are not
> addressed by KIP-101. Both scenarios require quick leadership change
> between brokers, which seems to suggest that the offset based obtained
> by "start
> offset of the first Leader Epoch larger than last epoch of the consumer"
> under these two scenarios may be very close to the offset obtained by the
> message timestamp. Does this sound reasonable?
>
> Good point that users on v1 format can get benefit with timestamp based
> approach. On the other hand it seems like a short term benefit for users
> who have not migrated. I am just not sure whether it is more important than
> designing a better API.
>
> Also, for both "latest" and "earliest" reset policy, do you think it would
> make sense to also use the KIP-101 based approach to truncate offset for
> the "unknown truncation offset" scenario?
>
>
> Thanks,
> Dong
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Dong Lin
Hey Jason,

Thanks for the explanation.

Please correct me if this is wrong. The "unknown truncation offset"
scenario happens when consumer does not have the full leaderEpoch -> offset
mapping. In this case we can still use the KIP-101-based approach to
truncate offset to "start offset of the first Leader Epoch larger than last
epoch of the consumer" but it may be inaccurate. So the KIP chooses to use
the timestamp-based approach which is also best-effort.

If this understanding is correct, for "closest" offset reset policy and
"unknown truncation offset" scenario, I am wondering whether it maybe
better to replace timestamp-based approach with KIP-101 based approach. In
comparison to timestamp-based approach, the KIP-101-based approach seems to
simplify the API a bit since user does not need to understand timestamp.
Similar to the timestamp-based approach, both approaches are best-effort
and do not guarantee that consumer can consume all messages. It is not like
KIP-279 which guarantees that follower broker can consume all messages from
the leader.

Then it seems that the remaining difference is mostly about accuracy, i.e.
how much message will be duplicated or missed in the "unknown truncation
offset" scenario. Not sure either one is clearly better than the other.
Note that there are two scenarios mentioned in KIP-279 which are not
addressed by KIP-101. Both scenarios require quick leadership change
between brokers, which seems to suggest that the offset based obtained
by "start
offset of the first Leader Epoch larger than last epoch of the consumer"
under these two scenarios may be very close to the offset obtained by the
message timestamp. Does this sound reasonable?

Good point that users on v1 format can get benefit with timestamp based
approach. On the other hand it seems like a short term benefit for users
who have not migrated. I am just not sure whether it is more important than
designing a better API.

Also, for both "latest" and "earliest" reset policy, do you think it would
make sense to also use the KIP-101 based approach to truncate offset for
the "unknown truncation offset" scenario?


Thanks,
Dong


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Jason Gustafson
The other thing I forgot to mention is that resetting the offset using the
leader epoch is only available with the latest message format. By
supporting reset by timestamp, users on the v1 format can still get some
benefit from this KIP.

-Jason

On Tue, Jun 26, 2018 at 11:47 AM, Jason Gustafson 
wrote:

> Hey Dong,
>
> Thanks for the comments.
>
> - The KIP says that, with auto.offset.reset="closest", timestamp is used to
>> find offset if truncation offset is unknown. It seems that if consumer
>> knows the timestamp of the last message, then the consumer should also
>> know
>> the (offset, leaderEpoch) of the last message which can then be used for
>> find the truncation offset. Can you explain why truncation offset is
>> unknown in this case?
>
>
> The intent of the new reset policy is to automatically locate the closest
> offset within the limits of Kafka log semantics. Unlike replicas,
> consumers do not know the full history of leader epochs that have been
> previously read. In some scenarios, they may not be able to precisely find
> the offset where the log diverged after a sequence of unclean leader
> elections (see KIP-279 for more detail). It seemed unfortunate in these
> cases to have to resort to the coarse-grained resetting using either the
> earliest or latest offset. Using the timestamp, we can find a more accurate
> reset point and minimize the amount of loss or duplication.
>
> - How does consumer differentiates between "Offset out of rnage (too low)"
>> and "Offset out of range (unknown truncation offset)", i.e. the two
>> columns
>> in table provided in the KIP?
>
>
> We know when an offset is too low because we have the start offset of the
> log from the fetch response. Following this KIP, that should really be the
> only time we get an OutOfRange error (other than buggy application code).
> The other two cases are distinguished based on whether we are able to find
> the right offset of divergence.
>
> - It is probably a typo. Maybe fix "This is not the last The" in the
>> Proposed Section.
>
>
> Thanks. Magnus noticed this too and I fixed it earlier this morning. Good
> to know who's actually reading the proposal!
>
> -Jason
>
>
>
> On Tue, Jun 26, 2018 at 11:09 AM, Dong Lin  wrote:
>
>> Hey Jason,
>>
>> Thanks for the KIP! It is pretty useful.
>>
>> At high level the new set of reset policies may be a bit complicated and
>> confusing to users. I am wondering whether we can simplify it. A few
>> questions below:
>>
>> - The KIP says that, with auto.offset.reset="closest", timestamp is used
>> to
>> find offset if truncation offset is unknown. It seems that if consumer
>> knows the timestamp of the last message, then the consumer should also
>> know
>> the (offset, leaderEpoch) of the last message which can then be used for
>> find the truncation offset. Can you explain why truncation offset is
>> unknown in this case?
>>
>> - How does consumer differentiates between "Offset out of rnage (too low)"
>> and "Offset out of range (unknown truncation offset)", i.e. the two
>> columns
>> in table provided in the KIP?
>>
>> - It is probably a typo. Maybe fix "This is not the last The" in the
>> Proposed Section.
>>
>>
>> Thanks,
>> Dong
>>
>> On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson 
>> wrote:
>>
>> > Hey All,
>> >
>> > I wrote up a KIP to handle one more edge case in the replication
>> protocol
>> > and to support better handling of truncation in the consumer when
>> unclean
>> > leader election is enabled. Let me know what you think.
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A
>> > +Allow+fetchers+to+detect+and+handle+log+truncation
>> >
>> > Thanks to Anna Povzner and Dong Lin for initial feedback.
>> >
>> > Thanks,
>> > Jason
>> >
>>
>
>


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Jason Gustafson
Hey Dong,

Thanks for the comments.

- The KIP says that, with auto.offset.reset="closest", timestamp is used to
> find offset if truncation offset is unknown. It seems that if consumer
> knows the timestamp of the last message, then the consumer should also know
> the (offset, leaderEpoch) of the last message which can then be used for
> find the truncation offset. Can you explain why truncation offset is
> unknown in this case?


The intent of the new reset policy is to automatically locate the closest
offset within the limits of Kafka log semantics. Unlike replicas, consumers
do not know the full history of leader epochs that have been previously
read. In some scenarios, they may not be able to precisely find the offset
where the log diverged after a sequence of unclean leader elections (see
KIP-279 for more detail). It seemed unfortunate in these cases to have to
resort to the coarse-grained resetting using either the earliest or latest
offset. Using the timestamp, we can find a more accurate reset point and
minimize the amount of loss or duplication.

- How does consumer differentiates between "Offset out of rnage (too low)"
> and "Offset out of range (unknown truncation offset)", i.e. the two columns
> in table provided in the KIP?


We know when an offset is too low because we have the start offset of the
log from the fetch response. Following this KIP, that should really be the
only time we get an OutOfRange error (other than buggy application code).
The other two cases are distinguished based on whether we are able to find
the right offset of divergence.

- It is probably a typo. Maybe fix "This is not the last The" in the
> Proposed Section.


Thanks. Magnus noticed this too and I fixed it earlier this morning. Good
to know who's actually reading the proposal!

-Jason



On Tue, Jun 26, 2018 at 11:09 AM, Dong Lin  wrote:

> Hey Jason,
>
> Thanks for the KIP! It is pretty useful.
>
> At high level the new set of reset policies may be a bit complicated and
> confusing to users. I am wondering whether we can simplify it. A few
> questions below:
>
> - The KIP says that, with auto.offset.reset="closest", timestamp is used to
> find offset if truncation offset is unknown. It seems that if consumer
> knows the timestamp of the last message, then the consumer should also know
> the (offset, leaderEpoch) of the last message which can then be used for
> find the truncation offset. Can you explain why truncation offset is
> unknown in this case?
>
> - How does consumer differentiates between "Offset out of rnage (too low)"
> and "Offset out of range (unknown truncation offset)", i.e. the two columns
> in table provided in the KIP?
>
> - It is probably a typo. Maybe fix "This is not the last The" in the
> Proposed Section.
>
>
> Thanks,
> Dong
>
> On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson 
> wrote:
>
> > Hey All,
> >
> > I wrote up a KIP to handle one more edge case in the replication protocol
> > and to support better handling of truncation in the consumer when unclean
> > leader election is enabled. Let me know what you think.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A
> > +Allow+fetchers+to+detect+and+handle+log+truncation
> >
> > Thanks to Anna Povzner and Dong Lin for initial feedback.
> >
> > Thanks,
> > Jason
> >
>


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-06-26 Thread Dong Lin
Hey Jason,

Thanks for the KIP! It is pretty useful.

At high level the new set of reset policies may be a bit complicated and
confusing to users. I am wondering whether we can simplify it. A few
questions below:

- The KIP says that, with auto.offset.reset="closest", timestamp is used to
find offset if truncation offset is unknown. It seems that if consumer
knows the timestamp of the last message, then the consumer should also know
the (offset, leaderEpoch) of the last message which can then be used for
find the truncation offset. Can you explain why truncation offset is
unknown in this case?

- How does consumer differentiates between "Offset out of rnage (too low)"
and "Offset out of range (unknown truncation offset)", i.e. the two columns
in table provided in the KIP?

- It is probably a typo. Maybe fix "This is not the last The" in the
Proposed Section.


Thanks,
Dong

On Mon, Jun 25, 2018 at 9:17 AM, Jason Gustafson  wrote:

> Hey All,
>
> I wrote up a KIP to handle one more edge case in the replication protocol
> and to support better handling of truncation in the consumer when unclean
> leader election is enabled. Let me know what you think.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A
> +Allow+fetchers+to+detect+and+handle+log+truncation
>
> Thanks to Anna Povzner and Dong Lin for initial feedback.
>
> Thanks,
> Jason
>