Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Matthias J. Sax
Thanks.

I buy the argument about the lag for active tasks.

Nit: The KIP briefly mentions the deprecation of `metadataFoKey()`
methods -- those should be listed as `@deprecated` next to the newly
added methods to point this change out more visibly.

Nit: in the code example, why do we loop over `inSyncStandbys` ? Would
we not just query only the most up-to-date one?

Nit: Section "Compatibility, Deprecation, and Migration Plan" should
point out that two methods are deprecated and user can migrate their
code to use the two new methods instead.

Those nits only address the write-up of the KIP, not the actual design
that LGTM.


-Matthias




On 11/14/19 3:48 PM, Guozhang Wang wrote:
> 10/20: I think I'm aligned with John's replies as well.
> 
> Guozhang
> 
> On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar 
> wrote:
> 
>>> during restoring state the active might have some lag
>>
>> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
>> lag as well then. If active is too laggy, the app can then deem the store
>> partition unavailable (based on what the application is willing to
>> tolerate).
>>
>> @matthias do you agree? We can then begin the vote.
>>
>> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
>>  wrote:
>>
>>> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
>>> actives don't have 0 lag, as during restoring state the active might have
>>> some lag and one of the features of this KIP is to provide an option to
>>> query from active (which might be in restoring state).
>>> I will update the KIP with rejected alternatives and post this will start
>>> a vote if everyone agrees on this.
>>> On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
>>> j...@confluent.io> wrote:
>>>
>>>  Hi all,
>>>
>>> Thanks for the "reset", Vinoth. It brings some needed clarity to the
>>> discussion.
>>>
>>> 10. My 2 cents: we might as well include the lags for the active
>>> copies as well. This is a more context-free API. If we only include
>>> standbys, this choice won't make sense to users unless they understand
>>> that the active task cannot lag in the steady state, since it's the
>>> source of updates. This isn't a bad thing to realize, but it's just
>>> more mental overhead for the person who wants to list the lags for
>>> "all local stores".
>>>
>>> Another reason is that we could consider also reporting the lag for
>>> actives during recovery (when they would have non-zero lag). We don't
>>> have to now, but if we choose to call the method "standby lags", then
>>> we can't make this choice in the future.
>>>
>>> That said, it's just my opinion. I'm fine either way.
>>>
>>> 20. Vinoth's reply makes sense to me, fwiw.
>>>
>>> Beyond these two points, I'm happy with the current proposal.
>>>
>>> Thanks again,
>>> -John
>>>
>>> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
>>> wrote:

 10. I considered that. Had to pick one or the other. Can just return
 standby too and rename method to may be “allLocalStandbyOffsetLags()”
>> to
 have it explicit. (Standby should implicitly convey that we are talking
 about stores)

 20. What I meant was, we are returning HostInfo instead of
>>> StreamsMetadata
 since thats sufficient to route query; same for “int partition “ vs
>> topic
 partition before. Previously KeyQueryMetadata had similar structure but
 used StreamsMetadata and TopicPartition objects to convey same
>>> information

 @navinder KIP is already upto date with the email I sent, except for
>> the
 reasonings I was laying out. +1 on revisiting rejected alternatives.
 Please make the follow up changes

 On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax >>
 wrote:

> Thanks for the summary Vinoth!
>
> I buy the overall argument. Couple of clarification questions:
>
>
> 10. Why do we need to include the active stores in
> `allLocalStoreOffsetLags()`? Would it not be simpler to just return
>> lag
> for standbys?
>
>
> 20: What does
>
>> Thin the KeyQueryMetadata object to just contain the minimum
>>> information
>> needed.
>
> exaclty mean? What is the "minimum information needed" ?
>
>
> @Navinder: if you agree, can you update the KIP accoringly? With all
>>> the
> proposals, it's hard to keep track and it would be great to have the
> current proposal summarized in the wiki page.
>
> Please also update the "Rejected alternative" sections to avoid that
>> we
> cycle back to old proposal (including the reason _why_ they got
>>> rejected).
>
>
> Thanks a lot!
>
>
> -Matthias
>
>
>
> On 11/13/19 7:10 PM, Vinoth Chandar wrote:
>> Given we have had a healthy discussion on this topic for a month
>> now
>>> and
>> still with many loose ends and open ended conversations, I thought
>> It
> would
>> be worthwhile to take a step back and re-evaluate 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Guozhang Wang
10/20: I think I'm aligned with John's replies as well.

Guozhang

On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar 
wrote:

> >during restoring state the active might have some lag
>
> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
> lag as well then. If active is too laggy, the app can then deem the store
> partition unavailable (based on what the application is willing to
> tolerate).
>
> @matthias do you agree? We can then begin the vote.
>
> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
>  wrote:
>
> > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
> > actives don't have 0 lag, as during restoring state the active might have
> > some lag and one of the features of this KIP is to provide an option to
> > query from active (which might be in restoring state).
> > I will update the KIP with rejected alternatives and post this will start
> > a vote if everyone agrees on this.
> > On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
> > j...@confluent.io> wrote:
> >
> >  Hi all,
> >
> > Thanks for the "reset", Vinoth. It brings some needed clarity to the
> > discussion.
> >
> > 10. My 2 cents: we might as well include the lags for the active
> > copies as well. This is a more context-free API. If we only include
> > standbys, this choice won't make sense to users unless they understand
> > that the active task cannot lag in the steady state, since it's the
> > source of updates. This isn't a bad thing to realize, but it's just
> > more mental overhead for the person who wants to list the lags for
> > "all local stores".
> >
> > Another reason is that we could consider also reporting the lag for
> > actives during recovery (when they would have non-zero lag). We don't
> > have to now, but if we choose to call the method "standby lags", then
> > we can't make this choice in the future.
> >
> > That said, it's just my opinion. I'm fine either way.
> >
> > 20. Vinoth's reply makes sense to me, fwiw.
> >
> > Beyond these two points, I'm happy with the current proposal.
> >
> > Thanks again,
> > -John
> >
> > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
> > wrote:
> > >
> > > 10. I considered that. Had to pick one or the other. Can just return
> > > standby too and rename method to may be “allLocalStandbyOffsetLags()”
> to
> > > have it explicit. (Standby should implicitly convey that we are talking
> > > about stores)
> > >
> > > 20. What I meant was, we are returning HostInfo instead of
> > StreamsMetadata
> > > since thats sufficient to route query; same for “int partition “ vs
> topic
> > > partition before. Previously KeyQueryMetadata had similar structure but
> > > used StreamsMetadata and TopicPartition objects to convey same
> > information
> > >
> > > @navinder KIP is already upto date with the email I sent, except for
> the
> > > reasonings I was laying out. +1 on revisiting rejected alternatives.
> > > Please make the follow up changes
> > >
> > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > Thanks for the summary Vinoth!
> > > >
> > > > I buy the overall argument. Couple of clarification questions:
> > > >
> > > >
> > > > 10. Why do we need to include the active stores in
> > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return
> lag
> > > > for standbys?
> > > >
> > > >
> > > > 20: What does
> > > >
> > > > > Thin the KeyQueryMetadata object to just contain the minimum
> > information
> > > > > needed.
> > > >
> > > > exaclty mean? What is the "minimum information needed" ?
> > > >
> > > >
> > > > @Navinder: if you agree, can you update the KIP accoringly? With all
> > the
> > > > proposals, it's hard to keep track and it would be great to have the
> > > > current proposal summarized in the wiki page.
> > > >
> > > > Please also update the "Rejected alternative" sections to avoid that
> we
> > > > cycle back to old proposal (including the reason _why_ they got
> > rejected).
> > > >
> > > >
> > > > Thanks a lot!
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > > > Given we have had a healthy discussion on this topic for a month
> now
> > and
> > > > > still with many loose ends and open ended conversations, I thought
> It
> > > > would
> > > > > be worthwhile to take a step back and re-evaluate everything in the
> > > > context
> > > > > of the very real use-case and its specific scenarios.
> > > > >
> > > > > First, let's remind ourselves of the query routing flow of the
> > streams
> > > > > application ("app" here on)
> > > > >
> > > > >1. queries get routed to any random streams instance in the
> > cluster
> > > > >("router" here on)
> > > > >2. router then uses Streams metadata to pick active/standby
> > instances
> > > > >for that key's store/partition
> > > > >3. router instance also maintains global lag information for all
> > > > stores
> > > > >and all their partitions, by a 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
>during restoring state the active might have some lag

Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
lag as well then. If active is too laggy, the app can then deem the store
partition unavailable (based on what the application is willing to
tolerate).

@matthias do you agree? We can then begin the vote.

On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
 wrote:

> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
> actives don't have 0 lag, as during restoring state the active might have
> some lag and one of the features of this KIP is to provide an option to
> query from active (which might be in restoring state).
> I will update the KIP with rejected alternatives and post this will start
> a vote if everyone agrees on this.
> On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
> j...@confluent.io> wrote:
>
>  Hi all,
>
> Thanks for the "reset", Vinoth. It brings some needed clarity to the
> discussion.
>
> 10. My 2 cents: we might as well include the lags for the active
> copies as well. This is a more context-free API. If we only include
> standbys, this choice won't make sense to users unless they understand
> that the active task cannot lag in the steady state, since it's the
> source of updates. This isn't a bad thing to realize, but it's just
> more mental overhead for the person who wants to list the lags for
> "all local stores".
>
> Another reason is that we could consider also reporting the lag for
> actives during recovery (when they would have non-zero lag). We don't
> have to now, but if we choose to call the method "standby lags", then
> we can't make this choice in the future.
>
> That said, it's just my opinion. I'm fine either way.
>
> 20. Vinoth's reply makes sense to me, fwiw.
>
> Beyond these two points, I'm happy with the current proposal.
>
> Thanks again,
> -John
>
> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
> wrote:
> >
> > 10. I considered that. Had to pick one or the other. Can just return
> > standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> > have it explicit. (Standby should implicitly convey that we are talking
> > about stores)
> >
> > 20. What I meant was, we are returning HostInfo instead of
> StreamsMetadata
> > since thats sufficient to route query; same for “int partition “ vs topic
> > partition before. Previously KeyQueryMetadata had similar structure but
> > used StreamsMetadata and TopicPartition objects to convey same
> information
> >
> > @navinder KIP is already upto date with the email I sent, except for the
> > reasonings I was laying out. +1 on revisiting rejected alternatives.
> > Please make the follow up changes
> >
> > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the summary Vinoth!
> > >
> > > I buy the overall argument. Couple of clarification questions:
> > >
> > >
> > > 10. Why do we need to include the active stores in
> > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > > for standbys?
> > >
> > >
> > > 20: What does
> > >
> > > > Thin the KeyQueryMetadata object to just contain the minimum
> information
> > > > needed.
> > >
> > > exaclty mean? What is the "minimum information needed" ?
> > >
> > >
> > > @Navinder: if you agree, can you update the KIP accoringly? With all
> the
> > > proposals, it's hard to keep track and it would be great to have the
> > > current proposal summarized in the wiki page.
> > >
> > > Please also update the "Rejected alternative" sections to avoid that we
> > > cycle back to old proposal (including the reason _why_ they got
> rejected).
> > >
> > >
> > > Thanks a lot!
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > > Given we have had a healthy discussion on this topic for a month now
> and
> > > > still with many loose ends and open ended conversations, I thought It
> > > would
> > > > be worthwhile to take a step back and re-evaluate everything in the
> > > context
> > > > of the very real use-case and its specific scenarios.
> > > >
> > > > First, let's remind ourselves of the query routing flow of the
> streams
> > > > application ("app" here on)
> > > >
> > > >1. queries get routed to any random streams instance in the
> cluster
> > > >("router" here on)
> > > >2. router then uses Streams metadata to pick active/standby
> instances
> > > >for that key's store/partition
> > > >3. router instance also maintains global lag information for all
> > > stores
> > > >and all their partitions, by a gossip/broadcast/heartbeat protocol
> > > (done
> > > >outside of Streams framework), but using
> KafkaStreams#allMetadata()
> > > for
> > > >streams instance discovery.
> > > >4. router then uses information in 2 & 3 to determine which
> instance
> > > to
> > > >send the query to  : always picks active instance if alive or the
> most
> > > >in-sync live standby otherwise.
> 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Navinder Brar
I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all actives 
don't have 0 lag, as during restoring state the active might have some lag and 
one of the features of this KIP is to provide an option to query from active 
(which might be in restoring state). 
I will update the KIP with rejected alternatives and post this will start a 
vote if everyone agrees on this.
On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler 
 wrote:  
 
 Hi all,

Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion.

10. My 2 cents: we might as well include the lags for the active
copies as well. This is a more context-free API. If we only include
standbys, this choice won't make sense to users unless they understand
that the active task cannot lag in the steady state, since it's the
source of updates. This isn't a bad thing to realize, but it's just
more mental overhead for the person who wants to list the lags for
"all local stores".

Another reason is that we could consider also reporting the lag for
actives during recovery (when they would have non-zero lag). We don't
have to now, but if we choose to call the method "standby lags", then
we can't make this choice in the future.

That said, it's just my opinion. I'm fine either way.

20. Vinoth's reply makes sense to me, fwiw.

Beyond these two points, I'm happy with the current proposal.

Thanks again,
-John

On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar  wrote:
>
> 10. I considered that. Had to pick one or the other. Can just return
> standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> have it explicit. (Standby should implicitly convey that we are talking
> about stores)
>
> 20. What I meant was, we are returning HostInfo instead of StreamsMetadata
> since thats sufficient to route query; same for “int partition “ vs topic
> partition before. Previously KeyQueryMetadata had similar structure but
> used StreamsMetadata and TopicPartition objects to convey same information
>
> @navinder KIP is already upto date with the email I sent, except for the
> reasonings I was laying out. +1 on revisiting rejected alternatives.
> Please make the follow up changes
>
> On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the summary Vinoth!
> >
> > I buy the overall argument. Couple of clarification questions:
> >
> >
> > 10. Why do we need to include the active stores in
> > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > for standbys?
> >
> >
> > 20: What does
> >
> > > Thin the KeyQueryMetadata object to just contain the minimum information
> > > needed.
> >
> > exaclty mean? What is the "minimum information needed" ?
> >
> >
> > @Navinder: if you agree, can you update the KIP accoringly? With all the
> > proposals, it's hard to keep track and it would be great to have the
> > current proposal summarized in the wiki page.
> >
> > Please also update the "Rejected alternative" sections to avoid that we
> > cycle back to old proposal (including the reason _why_ they got rejected).
> >
> >
> > Thanks a lot!
> >
> >
> > -Matthias
> >
> >
> >
> > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > Given we have had a healthy discussion on this topic for a month now and
> > > still with many loose ends and open ended conversations, I thought It
> > would
> > > be worthwhile to take a step back and re-evaluate everything in the
> > context
> > > of the very real use-case and its specific scenarios.
> > >
> > > First, let's remind ourselves of the query routing flow of the streams
> > > application ("app" here on)
> > >
> > >    1. queries get routed to any random streams instance in the cluster
> > >    ("router" here on)
> > >    2. router then uses Streams metadata to pick active/standby instances
> > >    for that key's store/partition
> > >    3. router instance also maintains global lag information for all
> > stores
> > >    and all their partitions, by a gossip/broadcast/heartbeat protocol
> > (done
> > >    outside of Streams framework), but using KafkaStreams#allMetadata()
> > for
> > >    streams instance discovery.
> > >    4. router then uses information in 2 & 3 to determine which instance
> > to
> > >    send the query to  : always picks active instance if alive or the most
> > >    in-sync live standby otherwise.
> > >
> > > Few things to note :
> > >
> > > A) We choose to decouple how the lag information is obtained (control
> > > plane) from query path (data plane), since that provides more flexibility
> > > in designing the control plane. i.e pick any or combination of gossip,
> > > N-way broadcast, control the rate of propagation, piggybacking on request
> > > responses
> > > B) Since the app needs to do its own control plane, talking to other
> > > instances directly for failure detection & exchanging other metadata, we
> > > can leave the lag APIs added to KafkaStreams class itself local and
> > simply
> > > return lag for all store/partitions 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread John Roesler
Hi all,

Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion.

10. My 2 cents: we might as well include the lags for the active
copies as well. This is a more context-free API. If we only include
standbys, this choice won't make sense to users unless they understand
that the active task cannot lag in the steady state, since it's the
source of updates. This isn't a bad thing to realize, but it's just
more mental overhead for the person who wants to list the lags for
"all local stores".

Another reason is that we could consider also reporting the lag for
actives during recovery (when they would have non-zero lag). We don't
have to now, but if we choose to call the method "standby lags", then
we can't make this choice in the future.

That said, it's just my opinion. I'm fine either way.

20. Vinoth's reply makes sense to me, fwiw.

Beyond these two points, I'm happy with the current proposal.

Thanks again,
-John

On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar  wrote:
>
> 10. I considered that. Had to pick one or the other. Can just return
> standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> have it explicit. (Standby should implicitly convey that we are talking
> about stores)
>
> 20. What I meant was, we are returning HostInfo instead of StreamsMetadata
> since thats sufficient to route query; same for “int partition “ vs topic
> partition before. Previously KeyQueryMetadata had similar structure but
> used StreamsMetadata and TopicPartition objects to convey same information
>
> @navinder KIP is already upto date with the email I sent, except for the
> reasonings I was laying out. +1 on revisiting rejected alternatives.
> Please make the follow up changes
>
> On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the summary Vinoth!
> >
> > I buy the overall argument. Couple of clarification questions:
> >
> >
> > 10. Why do we need to include the active stores in
> > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > for standbys?
> >
> >
> > 20: What does
> >
> > > Thin the KeyQueryMetadata object to just contain the minimum information
> > > needed.
> >
> > exaclty mean? What is the "minimum information needed" ?
> >
> >
> > @Navinder: if you agree, can you update the KIP accoringly? With all the
> > proposals, it's hard to keep track and it would be great to have the
> > current proposal summarized in the wiki page.
> >
> > Please also update the "Rejected alternative" sections to avoid that we
> > cycle back to old proposal (including the reason _why_ they got rejected).
> >
> >
> > Thanks a lot!
> >
> >
> > -Matthias
> >
> >
> >
> > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > Given we have had a healthy discussion on this topic for a month now and
> > > still with many loose ends and open ended conversations, I thought It
> > would
> > > be worthwhile to take a step back and re-evaluate everything in the
> > context
> > > of the very real use-case and its specific scenarios.
> > >
> > > First, let's remind ourselves of the query routing flow of the streams
> > > application ("app" here on)
> > >
> > >1. queries get routed to any random streams instance in the cluster
> > >("router" here on)
> > >2. router then uses Streams metadata to pick active/standby instances
> > >for that key's store/partition
> > >3. router instance also maintains global lag information for all
> > stores
> > >and all their partitions, by a gossip/broadcast/heartbeat protocol
> > (done
> > >outside of Streams framework), but using KafkaStreams#allMetadata()
> > for
> > >streams instance discovery.
> > >4. router then uses information in 2 & 3 to determine which instance
> > to
> > >send the query to  : always picks active instance if alive or the most
> > >in-sync live standby otherwise.
> > >
> > > Few things to note :
> > >
> > > A) We choose to decouple how the lag information is obtained (control
> > > plane) from query path (data plane), since that provides more flexibility
> > > in designing the control plane. i.e pick any or combination of gossip,
> > > N-way broadcast, control the rate of propagation, piggybacking on request
> > > responses
> > > B) Since the app needs to do its own control plane, talking to other
> > > instances directly for failure detection & exchanging other metadata, we
> > > can leave the lag APIs added to KafkaStreams class itself local and
> > simply
> > > return lag for all store/partitions on that instance.
> > > C) Streams preserves its existing behavior of instances only talking to
> > > each other through the Kafka brokers.
> > > D) Since the router treats active/standby differently, it would be good
> > for
> > > the KafkaStreams APIs to hand them back explicitly, with no additional
> > > logic needed for computing them. Specifically, the router only knows two
> > > things - key and store and if we just return a
> > Collection
> > > back, it 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
10. I considered that. Had to pick one or the other. Can just return
standby too and rename method to may be “allLocalStandbyOffsetLags()” to
have it explicit. (Standby should implicitly convey that we are talking
about stores)

20. What I meant was, we are returning HostInfo instead of StreamsMetadata
since thats sufficient to route query; same for “int partition “ vs topic
partition before. Previously KeyQueryMetadata had similar structure but
used StreamsMetadata and TopicPartition objects to convey same information

@navinder KIP is already upto date with the email I sent, except for the
reasonings I was laying out. +1 on revisiting rejected alternatives.
Please make the follow up changes

On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
wrote:

> Thanks for the summary Vinoth!
>
> I buy the overall argument. Couple of clarification questions:
>
>
> 10. Why do we need to include the active stores in
> `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> for standbys?
>
>
> 20: What does
>
> > Thin the KeyQueryMetadata object to just contain the minimum information
> > needed.
>
> exaclty mean? What is the "minimum information needed" ?
>
>
> @Navinder: if you agree, can you update the KIP accoringly? With all the
> proposals, it's hard to keep track and it would be great to have the
> current proposal summarized in the wiki page.
>
> Please also update the "Rejected alternative" sections to avoid that we
> cycle back to old proposal (including the reason _why_ they got rejected).
>
>
> Thanks a lot!
>
>
> -Matthias
>
>
>
> On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > Given we have had a healthy discussion on this topic for a month now and
> > still with many loose ends and open ended conversations, I thought It
> would
> > be worthwhile to take a step back and re-evaluate everything in the
> context
> > of the very real use-case and its specific scenarios.
> >
> > First, let's remind ourselves of the query routing flow of the streams
> > application ("app" here on)
> >
> >1. queries get routed to any random streams instance in the cluster
> >("router" here on)
> >2. router then uses Streams metadata to pick active/standby instances
> >for that key's store/partition
> >3. router instance also maintains global lag information for all
> stores
> >and all their partitions, by a gossip/broadcast/heartbeat protocol
> (done
> >outside of Streams framework), but using KafkaStreams#allMetadata()
> for
> >streams instance discovery.
> >4. router then uses information in 2 & 3 to determine which instance
> to
> >send the query to  : always picks active instance if alive or the most
> >in-sync live standby otherwise.
> >
> > Few things to note :
> >
> > A) We choose to decouple how the lag information is obtained (control
> > plane) from query path (data plane), since that provides more flexibility
> > in designing the control plane. i.e pick any or combination of gossip,
> > N-way broadcast, control the rate of propagation, piggybacking on request
> > responses
> > B) Since the app needs to do its own control plane, talking to other
> > instances directly for failure detection & exchanging other metadata, we
> > can leave the lag APIs added to KafkaStreams class itself local and
> simply
> > return lag for all store/partitions on that instance.
> > C) Streams preserves its existing behavior of instances only talking to
> > each other through the Kafka brokers.
> > D) Since the router treats active/standby differently, it would be good
> for
> > the KafkaStreams APIs to hand them back explicitly, with no additional
> > logic needed for computing them. Specifically, the router only knows two
> > things - key and store and if we just return a
> Collection
> > back, it cannot easily tease apart active and standby. Say, a streams
> > instance hosts the same store as both active and standby for different
> > partitions, matching by just storename the app will find it in both
> active
> > and standby lists.
> > E) From above, we assume the global lag estimate (lag per store topic
> > partition) are continuously reported amongst application instances and
> > already available on the router during step 2 above. Hence, attaching lag
> > APIs to StreamsMetadata is unnecessary and does not solve the needs
> anyway.
> > F) Currently returned StreamsMetadata object is really information about
> a
> > streams instance, that is not very specific to the key being queried.
> > Specifically, router has no knowledge of the topic partition a given key
> > belongs, this is needed for matching to the global lag information in
> step
> > 4 above (and as also the example code in the KIP showed before). The
> > StreamsMetadata, since it's about the instance itself, would contain all
> > topic partitions and stores on that instance, not specific to the given
> key.
> > G) A cleaner API would thin the amount of information returned to
> > specifically the given key and 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-12 Thread Vinoth Chandar
In all, is everyone OK with

 - Dropping KeyQueryMetadata, and the allMetadataForKey() apis
 - Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo class
 - Add offsetLag(store, key, serializer) -> Optional &
offsetLag(store, key, partitioner) -> Optional to StreamsMetadata
 - Duplicate the current methods for standbyMetadata in KafkaStreams :
allStandbyMetadata(), allStandbyMetadataForStore(), two variants of
standbyMetadataForKey(),


Responses to Guozhang:

1.1 Like I mentioned before, the allStandbyMetadata() and
allStandbyMetadataForStore() complement existing allMetadata() and
allMetadataForStore(), since we don't want to change behavior of existing
APIs. Based on discussions so far, if we decide to drop KeyQueryMetadata,
then we will need to introduce 4 equivalents for standby metadata as
Matthias mentioned.
1.2 I am okay with pushing lag information to a method on StreamsMetadata
(Personally, I won't design it like that, but happy to live with it) like
what Matthias suggested. But assuming topic name <=> store name equivalency
for mapping this seems like a broken API to me. If all of Streams code were
written like this, I can understand. But I don't think its the case? I
would not be comfortable making such assumptions outside of public APIs.
>>look into each one's standby partition / stores to tell which one
StreamsMetadata is corresponding to the instance who holds a specific key
as standby, yes, but I feel this one extra iteration is worth to avoid
introducing a new class.
This sort of thing would lead to non-standardized/potentially buggy client
implementations, for something I expect the system would hand me directly.
I don't personally feel introducing a new class is so bad, to warrant the
user to do all this matching. Given the current APIs are not explicitly
named to denote active metadata, it gives us a chance to build something
more direct and clear IMO. If we do allMetadataForKey apis, then we should
clearly separate active and standby ourselves. Alternate is separate active
and standby APIs as Matthias suggests, which I can make peace with.

1.3 Similar as above. In Streams code, we treat topic partitions and store
names separately?.
2.1 I think most databases build replication using logical offsets, not
time. Time lag can be a nice to have feature, but offset lag is fully
sufficient for a lot of use-cases.
2.2.1 We could support a lagInfoForStores() batch api. makes sense.


Responses to Matthias :
(100) Streams can still keep the upto date version in memory and
implementation could be for now just reading this already refreshed value.
Designing the API, with intent  of pushing this to the user keeps doors
open for supporting time based lag in the future.
(101) I am not sure what the parameters of evaluating approaches here is.
Generally, when I am handed a Metadata object, I don't expect to further
query it for more information semantically. I would not also force user to
make separate calls for active and standby metadata.
Well, that may be just me. So sure, we can push this into StreamsMetadata
if everyone agrees!
+1 on duplicating all 4 methods for standbys in this case.





On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar
 wrote:

>
>- Looking back, I agree that 2 calls to StreamsMetadata to fetch
> StreamsMetadata and then using something like ‘long
> StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is better
> than introducing a new public API i.e. ‘KeyQueryMetadata’. I will change
> the KIP accordingly.
>- >> I am actually not even sure, why we added
> `StreamsMetadata#topicPartitions()` originally
> I think it is helpful in showing which host holds which source topic
> partitions for /instances endpoint and when you query a key, you need to
> match the partition on which the key belongs to with the hosts holding
> source topic partitions for that partition. Is there any other way to get
> this info?
>
> On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang <
> wangg...@gmail.com> wrote:
>
>  Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so
> 1) allMetadata would still return the same number of StreamsMetadata in
> collection, just that within the StreamsMetadata now you have new APIs to
> access standby partitions / stores. So I think it would not be a breaking
> change to the public API to not include `allStandbyMetadata` and `
> allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`?
>
> Regarding 1.1: Good point about the partition number. But I'm still
> wondering if it is definitely necessary to introduce a new
> `KeyQueryMetadata`
> interface class. E.g. suppose our function signature is
>
> Collection allMetadataForKey
>
> When you get the collection of StreamsMetadata you need to iterate over the
> collection and look into each one's standby partition / stores to tell
> which one StreamsMetadata is corresponding to the instance who holds a
> specific key as standby, yes, but I feel 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-12 Thread Navinder Brar
   
   - Looking back, I agree that 2 calls to StreamsMetadata to fetch 
StreamsMetadata and then using something like ‘long 
StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is better than 
introducing a new public API i.e. ‘KeyQueryMetadata’. I will change the KIP 
accordingly.
   - >> I am actually not even sure, why we added 
`StreamsMetadata#topicPartitions()` originally   
I think it is helpful in showing which host holds which source topic partitions 
for /instances endpoint and when you query a key, you need to match the 
partition on which the key belongs to with the hosts holding source topic 
partitions for that partition. Is there any other way to get this info?

On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang 
 wrote:  
 
 Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so
1) allMetadata would still return the same number of StreamsMetadata in
collection, just that within the StreamsMetadata now you have new APIs to
access standby partitions / stores. So I think it would not be a breaking
change to the public API to not include `allStandbyMetadata` and `
allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`?

Regarding 1.1: Good point about the partition number. But I'm still
wondering if it is definitely necessary to introduce a new `KeyQueryMetadata`
interface class. E.g. suppose our function signature is

Collection allMetadataForKey

When you get the collection of StreamsMetadata you need to iterate over the
collection and look into each one's standby partition / stores to tell
which one StreamsMetadata is corresponding to the instance who holds a
specific key as standby, yes, but I feel this one extra iteration is worth
to avoid introducing a new class.


Guozhang

On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax 
wrote:

> I agree, that we might want to drop the time-base lag for the initial
> implementation. There is no good way to get this information without a
> broker side change.
>
>
>
> (100) For the offset lag information, I don't see a reason why the app
> should drive when this information is updated though, because KS will
> update this information anyway (once per `commit.interval.ms` -- and
> updating it more frequently does not make sense, as it most likely won't
> change more frequently anyway).
>
> If you all insist that the app should drive it, I can live with it, but
> I think it makes the API unnecessarily complex without a benefit.
>
>
>
> (101) I still don't understand why we need to have `KeyQueryMetadata`
> though. Note, that an instance can only report lag for it's local
> stores, but not remote stores as it does not know to what offset a
> remote standby has caught up to.
>
> > Because we needed to return the topicPartition the key belongs to, in
> > order to correlate with the lag information from the other set of APIs.
>
> My suggestion is to get the lag information from `StreamsMetadata` --
> which partition the store belongs to can be completely encapsulated
> within KS as all information is local, and I don't think we need to
> expose it to the user at all.
>
> We can just add `long StreamsMetadata#offsetLag(store, key)`. If the
> store is local we return its lag, if it's remote we return `-1` (ie,
> UNKNOWN). As an alternative, we can change the return type to
> `Optional`. This works for active and standby task alike.
>
> Note, that a user must verify if `StreamsMeatadata` is for itself
> (local) or remote anyway. We only need to provide a way that allows
> users to distinguish between active an standby. (More below.)
>
>
> I am actually not even sure, why we added
> `StreamsMetadata#topicPartitions()` originally -- seems pretty useless.
> Can we deprecate it as side cleanup in this KIP? Or do I miss something?
>
>
>
> (102)
>
> > There are basically 2 reasons. One is that instead of having two
> functions, one to get StreamsMetadata for active and one for replicas. We
> are fetching both in a single call and we have a way to get only active or
> only replicas from the KeyQueryMetadata object(just like isStandby() and
> isActive() discussion we had earlier)
>
> I understand, that we need two methods. However, I think we can simplify
> the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4
> existing methods for standby tasks:
>
> // note that `standbyMetadataForKey` return a Collection in contrast to
> existing `metadataForKey`
>
> >  Collection allStandbyMetadata()
> >  Collection allStandbyMetadataForStore(String
> storeName)
> >  Collection metadataForKey(String storeName, K key,
> Serializer keySerializer)
> >  Collection metadataForKey(String storeName, K key,
> StreamPartitioner partitioner)
>
> Because the existing methods return all active metadata, there is no
> reason to return `KeyQueryMetadata` as it's more complicated to get the
> standby metadata. With `KeyQueryMetadata` the user needs to make more
> calls to get the metadata:
>
>  

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-11 Thread Guozhang Wang
Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so
1) allMetadata would still return the same number of StreamsMetadata in
collection, just that within the StreamsMetadata now you have new APIs to
access standby partitions / stores. So I think it would not be a breaking
change to the public API to not include `allStandbyMetadata` and `
allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`?

Regarding 1.1: Good point about the partition number. But I'm still
wondering if it is definitely necessary to introduce a new `KeyQueryMetadata`
interface class. E.g. suppose our function signature is

Collection allMetadataForKey

When you get the collection of StreamsMetadata you need to iterate over the
collection and look into each one's standby partition / stores to tell
which one StreamsMetadata is corresponding to the instance who holds a
specific key as standby, yes, but I feel this one extra iteration is worth
to avoid introducing a new class.


Guozhang

On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax 
wrote:

> I agree, that we might want to drop the time-base lag for the initial
> implementation. There is no good way to get this information without a
> broker side change.
>
>
>
> (100) For the offset lag information, I don't see a reason why the app
> should drive when this information is updated though, because KS will
> update this information anyway (once per `commit.interval.ms` -- and
> updating it more frequently does not make sense, as it most likely won't
> change more frequently anyway).
>
> If you all insist that the app should drive it, I can live with it, but
> I think it makes the API unnecessarily complex without a benefit.
>
>
>
> (101) I still don't understand why we need to have `KeyQueryMetadata`
> though. Note, that an instance can only report lag for it's local
> stores, but not remote stores as it does not know to what offset a
> remote standby has caught up to.
>
> > Because we needed to return the topicPartition the key belongs to, in
> > order to correlate with the lag information from the other set of APIs.
>
> My suggestion is to get the lag information from `StreamsMetadata` --
> which partition the store belongs to can be completely encapsulated
> within KS as all information is local, and I don't think we need to
> expose it to the user at all.
>
> We can just add `long StreamsMetadata#offsetLag(store, key)`. If the
> store is local we return its lag, if it's remote we return `-1` (ie,
> UNKNOWN). As an alternative, we can change the return type to
> `Optional`. This works for active and standby task alike.
>
> Note, that a user must verify if `StreamsMeatadata` is for itself
> (local) or remote anyway. We only need to provide a way that allows
> users to distinguish between active an standby. (More below.)
>
>
> I am actually not even sure, why we added
> `StreamsMetadata#topicPartitions()` originally -- seems pretty useless.
> Can we deprecate it as side cleanup in this KIP? Or do I miss something?
>
>
>
> (102)
>
> > There are basically 2 reasons. One is that instead of having two
> functions, one to get StreamsMetadata for active and one for replicas. We
> are fetching both in a single call and we have a way to get only active or
> only replicas from the KeyQueryMetadata object(just like isStandby() and
> isActive() discussion we had earlier)
>
> I understand, that we need two methods. However, I think we can simplify
> the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4
> existing methods for standby tasks:
>
> // note that `standbyMetadataForKey` return a Collection in contrast to
> existing `metadataForKey`
>
> >   Collection allStandbyMetadata()
> >   Collection allStandbyMetadataForStore(String
> storeName)
> >   Collection metadataForKey(String storeName, K key,
> Serializer keySerializer)
> >   Collection metadataForKey(String storeName, K key,
> StreamPartitioner partitioner)
>
> Because the existing methods return all active metadata, there is no
> reason to return `KeyQueryMetadata` as it's more complicated to get the
> standby metadata. With `KeyQueryMetadata` the user needs to make more
> calls to get the metadata:
>
>   KafkaStreams#allMetadataForKey()
>   #getActive()
>
>   KafkaStreams#allMetadataForKey()
>   #getStandby()
>
> vs:
>
>   KafkaStreams#metadataForKey()
>
>   KafkaStreams#standbyMetadataForKey()
>
> The wrapping of both within `KeyQueryMetadata` does not seem to provide
> any benefit but increase our public API surface.
>
>
>
> @Guozhang:
>
> (1.1. + 1.2.) From my understanding `allMetadata()` (and other existing
> methods) will only return the metadata of _active_ tasks for backward
> compatibility reasons. If we would return standby metadata, existing
> code would potentially "break" because the code might pick a standby to
> query a key without noticing.
>
>
>
> -Matthias
>
>
> On 11/8/19 6:07 AM, Navinder Brar wrote:
> > Thanks, Guozhang for going through it 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-09 Thread Matthias J. Sax
I agree, that we might want to drop the time-base lag for the initial
implementation. There is no good way to get this information without a
broker side change.



(100) For the offset lag information, I don't see a reason why the app
should drive when this information is updated though, because KS will
update this information anyway (once per `commit.interval.ms` -- and
updating it more frequently does not make sense, as it most likely won't
change more frequently anyway).

If you all insist that the app should drive it, I can live with it, but
I think it makes the API unnecessarily complex without a benefit.



(101) I still don't understand why we need to have `KeyQueryMetadata`
though. Note, that an instance can only report lag for it's local
stores, but not remote stores as it does not know to what offset a
remote standby has caught up to.

> Because we needed to return the topicPartition the key belongs to, in
> order to correlate with the lag information from the other set of APIs.

My suggestion is to get the lag information from `StreamsMetadata` --
which partition the store belongs to can be completely encapsulated
within KS as all information is local, and I don't think we need to
expose it to the user at all.

We can just add `long StreamsMetadata#offsetLag(store, key)`. If the
store is local we return its lag, if it's remote we return `-1` (ie,
UNKNOWN). As an alternative, we can change the return type to
`Optional`. This works for active and standby task alike.

Note, that a user must verify if `StreamsMeatadata` is for itself
(local) or remote anyway. We only need to provide a way that allows
users to distinguish between active an standby. (More below.)


I am actually not even sure, why we added
`StreamsMetadata#topicPartitions()` originally -- seems pretty useless.
Can we deprecate it as side cleanup in this KIP? Or do I miss something?



(102)

> There are basically 2 reasons. One is that instead of having two functions, 
> one to get StreamsMetadata for active and one for replicas. We are fetching 
> both in a single call and we have a way to get only active or only replicas 
> from the KeyQueryMetadata object(just like isStandby() and isActive() 
> discussion we had earlier)

I understand, that we need two methods. However, I think we can simplify
the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4
existing methods for standby tasks:

// note that `standbyMetadataForKey` return a Collection in contrast to
existing `metadataForKey`

>   Collection allStandbyMetadata() 
>   Collection allStandbyMetadataForStore(String storeName)
>   Collection metadataForKey(String storeName, K key, 
> Serializer keySerializer)
>   Collection metadataForKey(String storeName, K key, 
> StreamPartitioner partitioner)

Because the existing methods return all active metadata, there is no
reason to return `KeyQueryMetadata` as it's more complicated to get the
standby metadata. With `KeyQueryMetadata` the user needs to make more
calls to get the metadata:

  KafkaStreams#allMetadataForKey()
  #getActive()

  KafkaStreams#allMetadataForKey()
  #getStandby()

vs:

  KafkaStreams#metadataForKey()

  KafkaStreams#standbyMetadataForKey()

The wrapping of both within `KeyQueryMetadata` does not seem to provide
any benefit but increase our public API surface.



@Guozhang:

(1.1. + 1.2.) From my understanding `allMetadata()` (and other existing
methods) will only return the metadata of _active_ tasks for backward
compatibility reasons. If we would return standby metadata, existing
code would potentially "break" because the code might pick a standby to
query a key without noticing.



-Matthias


On 11/8/19 6:07 AM, Navinder Brar wrote:
> Thanks, Guozhang for going through it again.
>
>- 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata 
> is not topicName, but the partition number. I agree changelog topicNames and 
> store names will have 1-1 mapping but we also need the partition number of 
> the changelog for which are calculating the lag. Now we can add partition 
> number in StreamsMetadata but it will be orthogonal to the definition of 
> StreamsMetadata i.e.- “Represents the state of an instance (process) in a 
> {@link KafkaStreams} application.”  If we add partition number in this, it 
> doesn’t stay metadata for an instance, because now it is storing the 
> partition information for a key being queried. So, having “KeyQueryMetadata” 
> simplifies this as now it contains all the metadata and also changelog and 
> partition information for which we need to calculate the lag.     
>
> Another way is having another function in parallel to metadataForKey, which 
> returns the partition number for the key being queried. But then we would 
> need 2 calls to StreamsMetadataState, once to fetch metadata and another to 
> fetch partition number. Let me know if any of these two ways seem more 
> intuitive than KeyQueryMetadata then we can 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-08 Thread Navinder Brar
Thanks, Guozhang for going through it again.
   
   - 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata is 
not topicName, but the partition number. I agree changelog topicNames and store 
names will have 1-1 mapping but we also need the partition number of the 
changelog for which are calculating the lag. Now we can add partition number in 
StreamsMetadata but it will be orthogonal to the definition of StreamsMetadata 
i.e.- “Represents the state of an instance (process) in a {@link KafkaStreams} 
application.”  If we add partition number in this, it doesn’t stay metadata for 
an instance, because now it is storing the partition information for a key 
being queried. So, having “KeyQueryMetadata” simplifies this as now it contains 
all the metadata and also changelog and partition information for which we need 
to calculate the lag.     
   
Another way is having another function in parallel to metadataForKey, which 
returns the partition number for the key being queried. But then we would need 
2 calls to StreamsMetadataState, once to fetch metadata and another to fetch 
partition number. Let me know if any of these two ways seem more intuitive than 
KeyQueryMetadata then we can try to converge on one.
   - 1.3:  Again, it is required for the partition number. We can drop store 
name though.
   - 2.1: I think this was done in accordance with the opinion from John as 
time lag would be better implemented with a broker level change and offset 
change is readily implementable. @vinoth? 
   - 2.2.1: Good point.  +1
   - 2.2.2: I am not well aware of it, @vinoth any comments?
   - 3.1: I think we have already agreed on dropping this, we need to KIP. 
Also, is there any opinion on lagInfoForStore(String storeName) vs 
lagInfoForStore(String storeName, int partition)
   - 3.2: But in functions such as onAssignment(), onPartitionsAssigned(), for 
standbyTasks also the topicPartitions we use are input topic partitions and not 
changelog partitions. Would this be breaking from that semantics?  
 

On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang 
 wrote:  
 
 Hi Navinder, Vinoth, thanks for the updated KIP!

Read through the discussions so far and made another pass on the wiki page,
and here are some more comments:

1. About the public APIs:

1.1. It is not clear to me how allStandbyMetadataForStore
and allStandbyMetadata would be differentiated from the original APIs given
that we will augment StreamsMetadata to include both active and standby
topic-partitions and store names, so I think we can still use allMetadata
and allMetadataForStore to get the collection of instance metadata that
host the store both as active and standbys. Are there any specific use
cases where we ONLY want to get the standby's metadata? And even if there
are, we can easily filter it out from the allMetadata / allMetadataForStore
right?

1.2. Similarly I'm wondering for allMetadataForKey, can we return the same
type: "Collection" which includes 1 for active, and N-1
for standbys, and callers can easily identify them by looking inside the
StreamsMetadata objects? In addition I feel the "topicPartition" field
inside "KeyQueryMetadata" is not very important since the changelog
topic-name is always 1-1 mapping to the store name, so as long as the store
name matches, the changelog topic name should always match (i.e. in
the pseudo code, just checking store names should be sufficient). If all of
the above assumption is true, I think we can save us from introducing one
more public class here.

1.3. Similarly in StoreLagInfo, seems not necessary to include the topic
partition name in addition to the store name.

2. About querying store lags: we've discussed about separating the querying
of the lag information and the querying of the host information so I still
support having separate APIs here. More thoughts:

2.1. Compared with offsets, I'm wondering would time-difference be more
intuitive for users to define the acceptable "staleness"? More strictly,
are there any scenarios where we would actually prefer offsets over
timestamps except that the timestamps are not available?

2.2. I'm also a bit leaning towards not putting the burden of periodically
refreshing our lag and caching it (and introducing another config) on the
streams side but document clearly its cost and let users to consider its
call frequency; of course in terms of implementation there are some
optimizations we can consider:

1) for restoring active tasks, the log-end-offset is read once since it is
not expected to change, and that offset / timestamp can be remembered for
lag calculation and we do not need to refresh again;
2) for standby tasks,  there's a "Map
endOffsets(Collection partitions)" in KafkaConsumer to
batch a list of topic-partitions in one round-trip, and we can use that to
let our APIs be sth. like "lagInfoForStores(Collection storeNames)"
to enable the batching effects.

3. Misc.:

3.1 There's a typo on the pseudo code 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-07 Thread Guozhang Wang
Hi Navinder, Vinoth, thanks for the updated KIP!

Read through the discussions so far and made another pass on the wiki page,
and here are some more comments:

1. About the public APIs:

1.1. It is not clear to me how allStandbyMetadataForStore
and allStandbyMetadata would be differentiated from the original APIs given
that we will augment StreamsMetadata to include both active and standby
topic-partitions and store names, so I think we can still use allMetadata
and allMetadataForStore to get the collection of instance metadata that
host the store both as active and standbys. Are there any specific use
cases where we ONLY want to get the standby's metadata? And even if there
are, we can easily filter it out from the allMetadata / allMetadataForStore
right?

1.2. Similarly I'm wondering for allMetadataForKey, can we return the same
type: "Collection" which includes 1 for active, and N-1
for standbys, and callers can easily identify them by looking inside the
StreamsMetadata objects? In addition I feel the "topicPartition" field
inside "KeyQueryMetadata" is not very important since the changelog
topic-name is always 1-1 mapping to the store name, so as long as the store
name matches, the changelog topic name should always match (i.e. in
the pseudo code, just checking store names should be sufficient). If all of
the above assumption is true, I think we can save us from introducing one
more public class here.

1.3. Similarly in StoreLagInfo, seems not necessary to include the topic
partition name in addition to the store name.

2. About querying store lags: we've discussed about separating the querying
of the lag information and the querying of the host information so I still
support having separate APIs here. More thoughts:

2.1. Compared with offsets, I'm wondering would time-difference be more
intuitive for users to define the acceptable "staleness"? More strictly,
are there any scenarios where we would actually prefer offsets over
timestamps except that the timestamps are not available?

2.2. I'm also a bit leaning towards not putting the burden of periodically
refreshing our lag and caching it (and introducing another config) on the
streams side but document clearly its cost and let users to consider its
call frequency; of course in terms of implementation there are some
optimizations we can consider:

1) for restoring active tasks, the log-end-offset is read once since it is
not expected to change, and that offset / timestamp can be remembered for
lag calculation and we do not need to refresh again;
2) for standby tasks,  there's a "Map
endOffsets(Collection partitions)" in KafkaConsumer to
batch a list of topic-partitions in one round-trip, and we can use that to
let our APIs be sth. like "lagInfoForStores(Collection storeNames)"
to enable the batching effects.

3. Misc.:

3.1 There's a typo on the pseudo code "globalLagInforation". Also it seems
not describing how that information is collected (personally I also feel
one "lagInfoForStores" is sufficient).
3.2 Note there's a slight semantical difference between active and
standby's "partitions" inside StreamsMetadata, for active tasks the
partitions are actually input topic partitions for the task: some of them
may also act as changelog topics but these are exceptional cases; for
standby tasks the "standbyTopicPartitions" are actually the changelog
topics of the task. So maybe renaming it to "standbyChangelogPartitions" to
differentiate it?


Overall I think this would be a really good KIP to add to Streams, thank
you so much!


Guozhang

On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar
 wrote:

> +1 on implementing offset based lag for now and push time-based lag to a
> later point in time when broker changes are done. Although time-based lag
> enhances the readability, it would not be a make or break change for
> implementing this KIP.
>
> Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2
> cents as well.
>- There are basically 2 reasons. One is that instead of having two
> functions, one to get StreamsMetadata for active and one for replicas. We
> are fetching both in a single call and we have a way to get only active or
> only replicas from the KeyQueryMetadata object(just like isStandby() and
> isActive() discussion we had earlier)
>- Since even after fetching the metadata now we have a requirement of
> fetching the topicPartition for which the query came:- to fetch lag for
> that specific topicPartition. Instead of having another call to fetch the
> partition from StreamsMetadataState we thought using one single call and
> fetching partition and all metadata would be better.
>- Another option was to change StreamsMetadata object and add
> topicPartition in that for which the query came but it doesn’t make sense
> in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata
> represents all the metadata for the Key being queried, i.e. the partition
> it belongs to and the list of StreamsMetadata(hosts) active 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-06 Thread Navinder Brar
+1 on implementing offset based lag for now and push time-based lag to a later 
point in time when broker changes are done. Although time-based lag enhances 
the readability, it would not be a make or break change for implementing this 
KIP. 

Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2 cents 
as well.    
   - There are basically 2 reasons. One is that instead of having two 
functions, one to get StreamsMetadata for active and one for replicas. We are 
fetching both in a single call and we have a way to get only active or only 
replicas from the KeyQueryMetadata object(just like isStandby() and isActive() 
discussion we had earlier)
   - Since even after fetching the metadata now we have a requirement of 
fetching the topicPartition for which the query came:- to fetch lag for that 
specific topicPartition. Instead of having another call to fetch the partition 
from StreamsMetadataState we thought using one single call and fetching 
partition and all metadata would be better.
   - Another option was to change StreamsMetadata object and add topicPartition 
in that for which the query came but it doesn’t make sense in terms of 
semantics as it StreamsMetadata. Also, KeyQueryMetadata represents all the 
metadata for the Key being queried, i.e. the partition it belongs to and the 
list of StreamsMetadata(hosts) active or replica where the key could be found.
   




On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth Chandar 
 wrote:  
 
 +1 to John, suggestion on Duration/Instant and dropping the API to fetch
all store's lags. However, I do think we need to return lags per topic
partition. So not sure if single return value would work? We need some new
class that holds a TopicPartition and Duration/Instant variables together?

10) Because we needed to return the topicPartition the key belongs to, in
order to correlate with the lag information from the other set of APIs.
Otherwise, we don't know which topic partition's lag estimate to use. We
tried to illustrate this on the example code. StreamsMetadata is simply
capturing state of a streams host/instance, where as TopicPartition depends
on the key passed in. This is a side effect of our decision to decouple lag
based filtering on the metadata apis.

20) Goes back to the previous point. We needed to return information that
is key specific, at which point it seemed natural for the KeyQueryMetadata
to contain active, standby, topic partition for that key. If we merely
returned a standbyMetadataForKey() -> Collection standby,
an active metadataForKey() -> StreamsMetadata, and new
getTopicPartition(key) -> topicPartition object back to the caller, then
arguably you could do the same kind of correlation. IMO having a the
KeyQueryMetadata class to encapsulate all this is a friendlier API.
 allStandbyMetadata() and allStandbyMetadataForStore() are just counter
parts for metadataForStore() and allMetadata() that we introduce mostly for
consistent API semantics. (their presence implicitly could help denote
metadataForStore() is for active instances. Happy to drop them if their
utility is not clear)

30) This would assume we refresh all the standby lag information every
time we query for that StreamsMetadata for a specific store? For time based
lag, this will involve fetching the tail kafka record at once from multiple
kafka topic partitions? I would prefer not to couple them like this and
have the ability to make granular store (or even topic partition level)
fetches for lag information.

32) I actually prefer John's suggestion to let the application drive the
lag fetches/updation and not have flags as the KIP current points to. Are
you reexamining that position?

On fetching lag information, +1 we could do this much more efficiently with
a broker changes. Given I don't yet have a burning need for the time based
lag, I think we can sequence the APIs such that the offset based ones are
implemented first, while we have a broker side change?
Given we decoupled the offset and time based lag API, I am willing to drop
the time based lag functionality (since its not needed right away for my
use-case). @navinder . thoughts?


On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax 
wrote:

> Navinder,
>
> thanks for updating the KIP. Couple of follow up questions:
>
>
> (10) Why do we need to introduce the class `KeyQueryMetadata`?
>
> (20) Why do we introduce the two methods `allMetadataForKey()`? Would it
> not be simpler to add `Collection
> standbyMetadataForKey(...)`. This would align with new methods
> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?
>
> (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just
> extend `StreamMetadata` with the corresponding attributes and methods
> (of active task, the lag would always be reported as zero)
>
> (32) Via (30) we can avoid the two new methods `#allLagInfo()` and
> `#lagInfoForStore()`, too, reducing public API and making it simpler to
> use the feature.
>
> Btw: If 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-06 Thread Vinoth Chandar
+1 to John, suggestion on Duration/Instant and dropping the API to fetch
all store's lags. However, I do think we need to return lags per topic
partition. So not sure if single return value would work? We need some new
class that holds a TopicPartition and Duration/Instant variables together?

10) Because we needed to return the topicPartition the key belongs to, in
order to correlate with the lag information from the other set of APIs.
Otherwise, we don't know which topic partition's lag estimate to use. We
tried to illustrate this on the example code. StreamsMetadata is simply
capturing state of a streams host/instance, where as TopicPartition depends
on the key passed in. This is a side effect of our decision to decouple lag
based filtering on the metadata apis.

20) Goes back to the previous point. We needed to return information that
is key specific, at which point it seemed natural for the KeyQueryMetadata
to contain active, standby, topic partition for that key. If we merely
returned a standbyMetadataForKey() -> Collection standby,
an active metadataForKey() -> StreamsMetadata, and new
getTopicPartition(key) -> topicPartition object back to the caller, then
arguably you could do the same kind of correlation. IMO having a the
KeyQueryMetadata class to encapsulate all this is a friendlier API.
 allStandbyMetadata() and allStandbyMetadataForStore() are just counter
parts for metadataForStore() and allMetadata() that we introduce mostly for
consistent API semantics. (their presence implicitly could help denote
metadataForStore() is for active instances. Happy to drop them if their
utility is not clear)

30) This would assume we refresh all the standby lag information every
time we query for that StreamsMetadata for a specific store? For time based
lag, this will involve fetching the tail kafka record at once from multiple
kafka topic partitions? I would prefer not to couple them like this and
have the ability to make granular store (or even topic partition level)
fetches for lag information.

32) I actually prefer John's suggestion to let the application drive the
lag fetches/updation and not have flags as the KIP current points to. Are
you reexamining that position?

On fetching lag information, +1 we could do this much more efficiently with
a broker changes. Given I don't yet have a burning need for the time based
lag, I think we can sequence the APIs such that the offset based ones are
implemented first, while we have a broker side change?
Given we decoupled the offset and time based lag API, I am willing to drop
the time based lag functionality (since its not needed right away for my
use-case). @navinder . thoughts?


On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax 
wrote:

> Navinder,
>
> thanks for updating the KIP. Couple of follow up questions:
>
>
> (10) Why do we need to introduce the class `KeyQueryMetadata`?
>
> (20) Why do we introduce the two methods `allMetadataForKey()`? Would it
> not be simpler to add `Collection
> standbyMetadataForKey(...)`. This would align with new methods
> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?
>
> (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just
> extend `StreamMetadata` with the corresponding attributes and methods
> (of active task, the lag would always be reported as zero)
>
> (32) Via (30) we can avoid the two new methods `#allLagInfo()` and
> `#lagInfoForStore()`, too, reducing public API and making it simpler to
> use the feature.
>
> Btw: If we make `StreamMetadata` thread safe, the lag information can be
> updated in the background without the need that the application
> refreshes its metadata. Hence, the user can get active and/or standby
> metadata once, and only needs to refresh it, if a rebalance happened.
>
>
> About point (4) of the previous thread: I was also thinking about
> when/how to update the time-lag information, and I agree that we should
> not update it for each query.
>
> "How": That we need to fetch the last record is a little bit
> unfortunate, but I don't see any other way without a broker change. One
> issue I still see is with "exactly-once" -- if transaction markers are
> in the topic, the last message is not at offset "endOffset - 1" and as
> multiple transaction markers might be after each other, it's unclear how
> to identify the offset of the last record... Thoughts?
>
> Hence, it might be worth to look into a broker change as a potential
> future improvement. It might be possible that the broker caches the
> latest timestamp per partition to serve this data efficiently, similar
> to `#endOffset()`.
>
> "When": We refresh the end-offset information based on the
> `commit.interval.ms` -- doing it more often is not really useful, as
> state store caches will most likely buffer up all writes to changelogs
> anyway and are only flushed on commit (including a flush of the
> producer). Hence, I would suggest to update the time-lag information
> based on the same strategy in the 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Matthias J. Sax
Navinder,

thanks for updating the KIP. Couple of follow up questions:


(10) Why do we need to introduce the class `KeyQueryMetadata`?

(20) Why do we introduce the two methods `allMetadataForKey()`? Would it
not be simpler to add `Collection
standbyMetadataForKey(...)`. This would align with new methods
`#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?

(30) Why do we need the class `StoreLagInfo` -- it seems simpler to just
extend `StreamMetadata` with the corresponding attributes and methods
(of active task, the lag would always be reported as zero)

(32) Via (30) we can avoid the two new methods `#allLagInfo()` and
`#lagInfoForStore()`, too, reducing public API and making it simpler to
use the feature.

Btw: If we make `StreamMetadata` thread safe, the lag information can be
updated in the background without the need that the application
refreshes its metadata. Hence, the user can get active and/or standby
metadata once, and only needs to refresh it, if a rebalance happened.


About point (4) of the previous thread: I was also thinking about
when/how to update the time-lag information, and I agree that we should
not update it for each query.

"How": That we need to fetch the last record is a little bit
unfortunate, but I don't see any other way without a broker change. One
issue I still see is with "exactly-once" -- if transaction markers are
in the topic, the last message is not at offset "endOffset - 1" and as
multiple transaction markers might be after each other, it's unclear how
to identify the offset of the last record... Thoughts?

Hence, it might be worth to look into a broker change as a potential
future improvement. It might be possible that the broker caches the
latest timestamp per partition to serve this data efficiently, similar
to `#endOffset()`.

"When": We refresh the end-offset information based on the
`commit.interval.ms` -- doing it more often is not really useful, as
state store caches will most likely buffer up all writes to changelogs
anyway and are only flushed on commit (including a flush of the
producer). Hence, I would suggest to update the time-lag information
based on the same strategy in the background. This way there is no
additional config or methods and the user does not need to worry about
it at all.

To avoid refresh overhead if we don't need it (a user might not use IQ
to begin with), it might be worth to maintain an internal flag
`updateTimeLagEnabled` that is set to `false` initially and only set to
`true` on the first call of a user to get standby-metadata.


-Matthias



On 11/4/19 5:13 PM, Vinoth Chandar wrote:
>>>  I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both Without the topic partitions that the tasks can map back to, we
> have to rely on topology/cluster metadata in each Streams instance to map
> the task back. If the source topics are wild carded for e,g then each
> instance could have different source topics in topology, until the next
> rebalance happens. You can also read my comments from here
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> 
> 
>>> seems hard to imagine how encoding arbitrarily long topic names plus an
> integer for the partition number could be as efficient as task ids, which
> are just two integers.
> 3. if you still have concerns about the efficacy of dictionary encoding,
> happy to engage. The link above also has some benchmark code I used.
> Theoretically, we would send each topic name atleast once, so yes if you
> compare a 10-20 character topic name + an integer to two integers, it will
> be more bytes. But its constant overhead proportional to size of topic name
> and with 4,8,12, partitions the size difference between baseline (version 4
> where we just repeated topic names for each topic partition) and the two
> approaches becomes narrow.
> 
>>> Plus, Navinder is going to implement a bunch of protocol code that we
> might just want to change when the discussion actually does take place, if
> ever.
>>> it'll just be a mental burden for everyone to remember that we want to
> have this follow-up discussion.
> 3. Is n't people changing same parts of code and tracking follow ups a
> common thing, we need to deal with anyway?  For this KIP, is n't it enough
> to reason about whether the additional map on top of the topic dictionary
> would incur more overhead than the sending task_ids? I don't think it's
> case, both of them send two integers. As I see it, we can do a separate
> follow up to (re)pursue the task_id conversion and get it working for both
> maps within the next release?
> 
>>> Can you elaborate on "breaking up the API"? It looks like there are
> already separate API calls in the proposal, one for time-lag, and another
> for 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Navinder Brar
Thanks John and Vinoth for converging thoughts on AssignmentInfo.
   
   - Report the time difference between the last consumed changelog record's 
timestamp and the changelog tail record's timestamp. This is an indicator of 
how fresh the local copy of a store is with respect to the active copy. Always 
Duration.ZERO for stores in active tasks      >>    I think for restoring 
active tasks this could still be non-zero.
   - I agree if there is no use case for  allLagInfo() maybe it's not needed at 
all.

Regards,Navinder 

On Wednesday, 6 November, 2019, 09:00:39 am IST, John Roesler 
 wrote:  
 
 Hey Vinoth,

Really sorry, I just remembered that I started a reply earlier today,
but got side-tracked.

Regarding the AssignmentInfo extension:

Your explanation for this point makes sense. I was incorrectly
thinking that the cluster metadata was shared with all members, but
now I see it's only given to the assignor. I agree now that the
assignor basically has to encode this information in the userdata
field if it wants the members to have it. Thanks for your patience in
explaining and linking the relevant history.

Given this constraint, the encoding part of the discussion is moot.
Regardless, the detail you provided does make sense to me.

I'm now in favor of the proposal for extending AssignmentInfo.


Regarding "breaking up the API":

Ah, my mistake. Yes, it sounds like this would be a good idea. I just
took another look at KafkaStreams. Since there's no method for getting
all the local stores, perhaps we can skip the "get the lags for all
stores" method, and just add two new methods to the KafkaStreams
interface like this:

KafkaStreams {
// existing
 T store(String storeName, QueryableStoreType queryableStoreType)

// new
/* Report the current amount by which the local store lags behind the
changelog tail. This is an indicator of how fresh the local copy of a
store is with respect to the active copy. Always 0 for stores in
active tasks. */
long storeChangelogOffsetLag(String storeName)

/* Report the time difference between the last consumed changelog
record's timestamp and the changelog tail record's timestamp. This is
an indicator of how fresh the local copy of a store is with respect to
the active copy. Always Duration.ZERO for stores in active tasks. */
Duration storeChangelogTimeLag(String storeName)
}

Note, I'm not insisting on this interface, just proposing it to
potentially minimize back-and-forth. Here's the reasoning:
* Since this API is no longer reporting lags for all stores, just
local ones, it makes sense to try and stick close to the `store(name,
type)` method. This also brings the new methods down to two. If others
think there's a use case for getting all the stores' lags, then we can
also propose to add corresponding `all*Lags` methods that return
`Map`.
* I also just realized that we were proposing to add
`timeLagEstimateMs()` as a `long`, but as a project, we have a larger
evolution to migrate to `Duration` and `Instant` where applicable. I
think it makes sense to do that in this case.

How does this sound?
Thanks,
-John



On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar  wrote:
>
> Ping :) Any thoughts?
>
> On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:
>
> > >>  I'm having some trouble wrapping my head around what race conditions
> > might occur, other than the fundamentally broken state in which different
> > instances are running totally different topologies.
> > 3. @both Without the topic partitions that the tasks can map back to, we
> > have to rely on topology/cluster metadata in each Streams instance to map
> > the task back. If the source topics are wild carded for e,g then each
> > instance could have different source topics in topology, until the next
> > rebalance happens. You can also read my comments from here
> > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> >
> >
> > >> seems hard to imagine how encoding arbitrarily long topic names plus an
> > integer for the partition number could be as efficient as task ids, which
> > are just two integers.
> > 3. if you still have concerns about the efficacy of dictionary encoding,
> > happy to engage. The link above also has some benchmark code I used.
> > Theoretically, we would send each topic name atleast once, so yes if you
> > compare a 10-20 character topic name + an integer to two integers, it will
> > be more bytes. But its constant overhead proportional to size of topic name
> > and with 4,8,12, partitions the size difference between baseline (version 4
> > where we just repeated topic names for each topic partition) and the two
> > approaches becomes narrow.
> >
> > >>Plus, Navinder is going to implement a bunch of protocol code that we
> > might just want to change when the discussion actually does take place, if
> > ever.
> > >>it'll just be a mental burden for everyone to remember that we want 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread John Roesler
Hey Vinoth,

Really sorry, I just remembered that I started a reply earlier today,
but got side-tracked.

Regarding the AssignmentInfo extension:

Your explanation for this point makes sense. I was incorrectly
thinking that the cluster metadata was shared with all members, but
now I see it's only given to the assignor. I agree now that the
assignor basically has to encode this information in the userdata
field if it wants the members to have it. Thanks for your patience in
explaining and linking the relevant history.

Given this constraint, the encoding part of the discussion is moot.
Regardless, the detail you provided does make sense to me.

I'm now in favor of the proposal for extending AssignmentInfo.


Regarding "breaking up the API":

Ah, my mistake. Yes, it sounds like this would be a good idea. I just
took another look at KafkaStreams. Since there's no method for getting
all the local stores, perhaps we can skip the "get the lags for all
stores" method, and just add two new methods to the KafkaStreams
interface like this:

KafkaStreams {
// existing
 T store(String storeName, QueryableStoreType queryableStoreType)

// new
/* Report the current amount by which the local store lags behind the
changelog tail. This is an indicator of how fresh the local copy of a
store is with respect to the active copy. Always 0 for stores in
active tasks. */
long storeChangelogOffsetLag(String storeName)

/* Report the time difference between the last consumed changelog
record's timestamp and the changelog tail record's timestamp. This is
an indicator of how fresh the local copy of a store is with respect to
the active copy. Always Duration.ZERO for stores in active tasks. */
Duration storeChangelogTimeLag(String storeName)
}

Note, I'm not insisting on this interface, just proposing it to
potentially minimize back-and-forth. Here's the reasoning:
* Since this API is no longer reporting lags for all stores, just
local ones, it makes sense to try and stick close to the `store(name,
type)` method. This also brings the new methods down to two. If others
think there's a use case for getting all the stores' lags, then we can
also propose to add corresponding `all*Lags` methods that return
`Map`.
* I also just realized that we were proposing to add
`timeLagEstimateMs()` as a `long`, but as a project, we have a larger
evolution to migrate to `Duration` and `Instant` where applicable. I
think it makes sense to do that in this case.

How does this sound?
Thanks,
-John



On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar  wrote:
>
> Ping :) Any thoughts?
>
> On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:
>
> > >>  I'm having some trouble wrapping my head around what race conditions
> > might occur, other than the fundamentally broken state in which different
> > instances are running totally different topologies.
> > 3. @both Without the topic partitions that the tasks can map back to, we
> > have to rely on topology/cluster metadata in each Streams instance to map
> > the task back. If the source topics are wild carded for e,g then each
> > instance could have different source topics in topology, until the next
> > rebalance happens. You can also read my comments from here
> > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> >
> >
> > >> seems hard to imagine how encoding arbitrarily long topic names plus an
> > integer for the partition number could be as efficient as task ids, which
> > are just two integers.
> > 3. if you still have concerns about the efficacy of dictionary encoding,
> > happy to engage. The link above also has some benchmark code I used.
> > Theoretically, we would send each topic name atleast once, so yes if you
> > compare a 10-20 character topic name + an integer to two integers, it will
> > be more bytes. But its constant overhead proportional to size of topic name
> > and with 4,8,12, partitions the size difference between baseline (version 4
> > where we just repeated topic names for each topic partition) and the two
> > approaches becomes narrow.
> >
> > >>Plus, Navinder is going to implement a bunch of protocol code that we
> > might just want to change when the discussion actually does take place, if
> > ever.
> > >>it'll just be a mental burden for everyone to remember that we want to
> > have this follow-up discussion.
> > 3. Is n't people changing same parts of code and tracking follow ups a
> > common thing, we need to deal with anyway?  For this KIP, is n't it enough
> > to reason about whether the additional map on top of the topic dictionary
> > would incur more overhead than the sending task_ids? I don't think it's
> > case, both of them send two integers. As I see it, we can do a separate
> > follow up to (re)pursue the task_id conversion and get it working for both
> > maps within the next release?
> >
> > >>Can you elaborate on "breaking up the API"? It looks like there 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Vinoth Chandar
Ping :) Any thoughts?

On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:

> >>  I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both Without the topic partitions that the tasks can map back to, we
> have to rely on topology/cluster metadata in each Streams instance to map
> the task back. If the source topics are wild carded for e,g then each
> instance could have different source topics in topology, until the next
> rebalance happens. You can also read my comments from here
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
>
>
> >> seems hard to imagine how encoding arbitrarily long topic names plus an
> integer for the partition number could be as efficient as task ids, which
> are just two integers.
> 3. if you still have concerns about the efficacy of dictionary encoding,
> happy to engage. The link above also has some benchmark code I used.
> Theoretically, we would send each topic name atleast once, so yes if you
> compare a 10-20 character topic name + an integer to two integers, it will
> be more bytes. But its constant overhead proportional to size of topic name
> and with 4,8,12, partitions the size difference between baseline (version 4
> where we just repeated topic names for each topic partition) and the two
> approaches becomes narrow.
>
> >>Plus, Navinder is going to implement a bunch of protocol code that we
> might just want to change when the discussion actually does take place, if
> ever.
> >>it'll just be a mental burden for everyone to remember that we want to
> have this follow-up discussion.
> 3. Is n't people changing same parts of code and tracking follow ups a
> common thing, we need to deal with anyway?  For this KIP, is n't it enough
> to reason about whether the additional map on top of the topic dictionary
> would incur more overhead than the sending task_ids? I don't think it's
> case, both of them send two integers. As I see it, we can do a separate
> follow up to (re)pursue the task_id conversion and get it working for both
> maps within the next release?
>
> >>Can you elaborate on "breaking up the API"? It looks like there are
> already separate API calls in the proposal, one for time-lag, and another
> for offset-lag, so are they not already broken up?
> The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
> objects which has both time and offset lags. If we had separate APIs, say
> (e.g offsetLagForStore(), timeLagForStore()), we can implement offset
> version using the offset lag that the streams instance already tracks i.e
> no need for external calls. The time based lag API would incur the kafka
> read for the timestamp. makes sense?
>
> Based on the discussions so far, I only see these two pending issues to be
> aligned on. Is there any other open item people want to bring up?
>
> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman 
> wrote:
>
>> Regarding 3) I'm wondering, does your concern still apply even now
>> that the pluggable PartitionGrouper interface has been deprecated?
>> Now that we can be sure that the DefaultPartitionGrouper is used to
>> generate
>> the taskId -> partitions mapping, we should be able to convert any taskId
>> to any
>> partitions.
>>
>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
>>
>> > Hey Vinoth, thanks for the reply!
>> >
>> > 3.
>> > I get that it's not the main focus of this KIP, but if it's ok, it
>> > would be nice to hash out this point right now. It only came up
>> > because this KIP-535 is substantially extending the pattern in
>> > question. If we push it off until later, then the reviewers are going
>> > to have to suspend their concerns not just while voting for the KIP,
>> > but also while reviewing the code. Plus, Navinder is going to
>> > implement a bunch of protocol code that we might just want to change
>> > when the discussion actually does take place, if ever. Finally, it'll
>> > just be a mental burden for everyone to remember that we want to have
>> > this follow-up discussion.
>> >
>> > It makes sense what you say... the specific assignment is already
>> > encoded in the "main" portion of the assignment, not in the "userdata"
>> > part. It also makes sense that it's simpler to reason about races if
>> > you simply get all the information about the topics and partitions
>> > directly from the assignor, rather than get the partition number from
>> > the assignor and the topic name from your own a priori knowledge of
>> > the topology. On the other hand, I'm having some trouble wrapping my
>> > head around what race conditions might occur, other than the
>> > fundamentally broken state in which different instances are running
>> > totally different topologies. Sorry, but can you remind us of the
>> > specific condition?
>> 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
>>  I'm having some trouble wrapping my head around what race conditions
might occur, other than the fundamentally broken state in which different
instances are running totally different topologies.
3. @both Without the topic partitions that the tasks can map back to, we
have to rely on topology/cluster metadata in each Streams instance to map
the task back. If the source topics are wild carded for e,g then each
instance could have different source topics in topology, until the next
rebalance happens. You can also read my comments from here
https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106


>> seems hard to imagine how encoding arbitrarily long topic names plus an
integer for the partition number could be as efficient as task ids, which
are just two integers.
3. if you still have concerns about the efficacy of dictionary encoding,
happy to engage. The link above also has some benchmark code I used.
Theoretically, we would send each topic name atleast once, so yes if you
compare a 10-20 character topic name + an integer to two integers, it will
be more bytes. But its constant overhead proportional to size of topic name
and with 4,8,12, partitions the size difference between baseline (version 4
where we just repeated topic names for each topic partition) and the two
approaches becomes narrow.

>>Plus, Navinder is going to implement a bunch of protocol code that we
might just want to change when the discussion actually does take place, if
ever.
>>it'll just be a mental burden for everyone to remember that we want to
have this follow-up discussion.
3. Is n't people changing same parts of code and tracking follow ups a
common thing, we need to deal with anyway?  For this KIP, is n't it enough
to reason about whether the additional map on top of the topic dictionary
would incur more overhead than the sending task_ids? I don't think it's
case, both of them send two integers. As I see it, we can do a separate
follow up to (re)pursue the task_id conversion and get it working for both
maps within the next release?

>>Can you elaborate on "breaking up the API"? It looks like there are
already separate API calls in the proposal, one for time-lag, and another
for offset-lag, so are they not already broken up?
The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo objects
which has both time and offset lags. If we had separate APIs, say (e.g
offsetLagForStore(), timeLagForStore()), we can implement offset version
using the offset lag that the streams instance already tracks i.e no need
for external calls. The time based lag API would incur the kafka read for
the timestamp. makes sense?

Based on the discussions so far, I only see these two pending issues to be
aligned on. Is there any other open item people want to bring up?

On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman 
wrote:

> Regarding 3) I'm wondering, does your concern still apply even now
> that the pluggable PartitionGrouper interface has been deprecated?
> Now that we can be sure that the DefaultPartitionGrouper is used to
> generate
> the taskId -> partitions mapping, we should be able to convert any taskId
> to any
> partitions.
>
> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
>
> > Hey Vinoth, thanks for the reply!
> >
> > 3.
> > I get that it's not the main focus of this KIP, but if it's ok, it
> > would be nice to hash out this point right now. It only came up
> > because this KIP-535 is substantially extending the pattern in
> > question. If we push it off until later, then the reviewers are going
> > to have to suspend their concerns not just while voting for the KIP,
> > but also while reviewing the code. Plus, Navinder is going to
> > implement a bunch of protocol code that we might just want to change
> > when the discussion actually does take place, if ever. Finally, it'll
> > just be a mental burden for everyone to remember that we want to have
> > this follow-up discussion.
> >
> > It makes sense what you say... the specific assignment is already
> > encoded in the "main" portion of the assignment, not in the "userdata"
> > part. It also makes sense that it's simpler to reason about races if
> > you simply get all the information about the topics and partitions
> > directly from the assignor, rather than get the partition number from
> > the assignor and the topic name from your own a priori knowledge of
> > the topology. On the other hand, I'm having some trouble wrapping my
> > head around what race conditions might occur, other than the
> > fundamentally broken state in which different instances are running
> > totally different topologies. Sorry, but can you remind us of the
> > specific condition?
> >
> > To the efficiency counterargument, it seems hard to imagine how
> > encoding arbitrarily long topic names plus an integer for the
> > partition number could be as efficient as task ids, which are just two

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Sophie Blee-Goldman
Regarding 3) I'm wondering, does your concern still apply even now
that the pluggable PartitionGrouper interface has been deprecated?
Now that we can be sure that the DefaultPartitionGrouper is used to generate
the taskId -> partitions mapping, we should be able to convert any taskId
to any
partitions.

On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:

> Hey Vinoth, thanks for the reply!
>
> 3.
> I get that it's not the main focus of this KIP, but if it's ok, it
> would be nice to hash out this point right now. It only came up
> because this KIP-535 is substantially extending the pattern in
> question. If we push it off until later, then the reviewers are going
> to have to suspend their concerns not just while voting for the KIP,
> but also while reviewing the code. Plus, Navinder is going to
> implement a bunch of protocol code that we might just want to change
> when the discussion actually does take place, if ever. Finally, it'll
> just be a mental burden for everyone to remember that we want to have
> this follow-up discussion.
>
> It makes sense what you say... the specific assignment is already
> encoded in the "main" portion of the assignment, not in the "userdata"
> part. It also makes sense that it's simpler to reason about races if
> you simply get all the information about the topics and partitions
> directly from the assignor, rather than get the partition number from
> the assignor and the topic name from your own a priori knowledge of
> the topology. On the other hand, I'm having some trouble wrapping my
> head around what race conditions might occur, other than the
> fundamentally broken state in which different instances are running
> totally different topologies. Sorry, but can you remind us of the
> specific condition?
>
> To the efficiency counterargument, it seems hard to imagine how
> encoding arbitrarily long topic names plus an integer for the
> partition number could be as efficient as task ids, which are just two
> integers. It seems like this would only be true if topic names were 4
> characters or less.
>
> 4.
> Yeah, clearly, it would not be a good idea to query the metadata
> before every single IQ query. I think there are plenty of established
> patterns for distributed database clients to follow. Can you elaborate
> on "breaking up the API"? It looks like there are already separate API
> calls in the proposal, one for time-lag, and another for offset-lag,
> so are they not already broken up? FWIW, yes, I agree, the offset lag
> is already locally known, so we don't need to build in an extra
> synchronous broker API call, just one for the time-lag.
>
> Thanks again for the discussion,
> -John
>
> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar 
> wrote:
> >
> > 3. Right now, we still get the topic partitions assigned as a part of the
> > top level Assignment object (the one that wraps AssignmentInfo) and use
> > that to convert taskIds back. This list of only contains assignments for
> > that particular instance. Attempting to also reverse map for "all" the
> > tasksIds in the streams cluster i.e all the topic partitions in these
> > global assignment maps was what was problematic. By explicitly sending
> the
> > global assignment maps as actual topic partitions,  group coordinator
> (i.e
> > the leader that computes the assignment's ) is able to consistently
> enforce
> > its view of the topic metadata. Still don't think doing such a change
> that
> > forces you to reconsider semantics, is not needed to save bits on wire.
> May
> > be we can discuss this separately from this KIP?
> >
> > 4. There needs to be some caching/interval somewhere though since we
> don't
> > want to make 1 kafka read per 1 IQ potentially. But I think its a valid
> > suggestion, to make this call just synchronous and leave the caching or
> how
> > often you want to call to the application. Would it be good to then break
> > up the APIs for time and offset based lag?  We can obtain offset based
> lag
> > for free? Only incur the overhead of reading kafka if we want time
> > based lags?
> >
> > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
> > wrote:
> >
> > > Adding on to John's response to 3), can you clarify when and why
> exactly we
> > > cannot
> > > convert between taskIds and partitions? If that's really the case I
> don't
> > > feel confident
> > > that the StreamsPartitionAssignor is not full of bugs...
> > >
> > > It seems like it currently just encodes a list of all partitions (the
> > > assignment) and also
> > > a list of the corresponding task ids, duplicated to ensure each
> partition
> > > has the corresponding
> > > taskId at the same offset into the list. Why is that problematic?
> > >
> > >
> > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler 
> wrote:
> > >
> > > > Thanks, all, for considering the points!
> > > >
> > > > 3. Interesting. I have a vague recollection of that... Still, though,
> > > > it seems a little fishy. After all, we return the assignments
> > > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread John Roesler
Hey Vinoth, thanks for the reply!

3.
I get that it's not the main focus of this KIP, but if it's ok, it
would be nice to hash out this point right now. It only came up
because this KIP-535 is substantially extending the pattern in
question. If we push it off until later, then the reviewers are going
to have to suspend their concerns not just while voting for the KIP,
but also while reviewing the code. Plus, Navinder is going to
implement a bunch of protocol code that we might just want to change
when the discussion actually does take place, if ever. Finally, it'll
just be a mental burden for everyone to remember that we want to have
this follow-up discussion.

It makes sense what you say... the specific assignment is already
encoded in the "main" portion of the assignment, not in the "userdata"
part. It also makes sense that it's simpler to reason about races if
you simply get all the information about the topics and partitions
directly from the assignor, rather than get the partition number from
the assignor and the topic name from your own a priori knowledge of
the topology. On the other hand, I'm having some trouble wrapping my
head around what race conditions might occur, other than the
fundamentally broken state in which different instances are running
totally different topologies. Sorry, but can you remind us of the
specific condition?

To the efficiency counterargument, it seems hard to imagine how
encoding arbitrarily long topic names plus an integer for the
partition number could be as efficient as task ids, which are just two
integers. It seems like this would only be true if topic names were 4
characters or less.

4.
Yeah, clearly, it would not be a good idea to query the metadata
before every single IQ query. I think there are plenty of established
patterns for distributed database clients to follow. Can you elaborate
on "breaking up the API"? It looks like there are already separate API
calls in the proposal, one for time-lag, and another for offset-lag,
so are they not already broken up? FWIW, yes, I agree, the offset lag
is already locally known, so we don't need to build in an extra
synchronous broker API call, just one for the time-lag.

Thanks again for the discussion,
-John

On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar  wrote:
>
> 3. Right now, we still get the topic partitions assigned as a part of the
> top level Assignment object (the one that wraps AssignmentInfo) and use
> that to convert taskIds back. This list of only contains assignments for
> that particular instance. Attempting to also reverse map for "all" the
> tasksIds in the streams cluster i.e all the topic partitions in these
> global assignment maps was what was problematic. By explicitly sending the
> global assignment maps as actual topic partitions,  group coordinator (i.e
> the leader that computes the assignment's ) is able to consistently enforce
> its view of the topic metadata. Still don't think doing such a change that
> forces you to reconsider semantics, is not needed to save bits on wire. May
> be we can discuss this separately from this KIP?
>
> 4. There needs to be some caching/interval somewhere though since we don't
> want to make 1 kafka read per 1 IQ potentially. But I think its a valid
> suggestion, to make this call just synchronous and leave the caching or how
> often you want to call to the application. Would it be good to then break
> up the APIs for time and offset based lag?  We can obtain offset based lag
> for free? Only incur the overhead of reading kafka if we want time
> based lags?
>
> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
> wrote:
>
> > Adding on to John's response to 3), can you clarify when and why exactly we
> > cannot
> > convert between taskIds and partitions? If that's really the case I don't
> > feel confident
> > that the StreamsPartitionAssignor is not full of bugs...
> >
> > It seems like it currently just encodes a list of all partitions (the
> > assignment) and also
> > a list of the corresponding task ids, duplicated to ensure each partition
> > has the corresponding
> > taskId at the same offset into the list. Why is that problematic?
> >
> >
> > On Fri, Nov 1, 2019 at 12:39 PM John Roesler  wrote:
> >
> > > Thanks, all, for considering the points!
> > >
> > > 3. Interesting. I have a vague recollection of that... Still, though,
> > > it seems a little fishy. After all, we return the assignments
> > > themselves as task ids, and the members have to map these to topic
> > > partitions in order to configure themselves properly. If it's too
> > > complicated to get this right, then how do we know that Streams is
> > > computing the correct partitions at all?
> > >
> > > 4. How about just checking the log-end timestamp when you call the
> > > method? Then, when you get an answer, it's as fresh as it could
> > > possibly be. And as a user you have just one, obvious, "knob" to
> > > configure how much overhead you want to devote to checking... If you
> > > want to 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
3. Right now, we still get the topic partitions assigned as a part of the
top level Assignment object (the one that wraps AssignmentInfo) and use
that to convert taskIds back. This list of only contains assignments for
that particular instance. Attempting to also reverse map for "all" the
tasksIds in the streams cluster i.e all the topic partitions in these
global assignment maps was what was problematic. By explicitly sending the
global assignment maps as actual topic partitions,  group coordinator (i.e
the leader that computes the assignment's ) is able to consistently enforce
its view of the topic metadata. Still don't think doing such a change that
forces you to reconsider semantics, is not needed to save bits on wire. May
be we can discuss this separately from this KIP?

4. There needs to be some caching/interval somewhere though since we don't
want to make 1 kafka read per 1 IQ potentially. But I think its a valid
suggestion, to make this call just synchronous and leave the caching or how
often you want to call to the application. Would it be good to then break
up the APIs for time and offset based lag?  We can obtain offset based lag
for free? Only incur the overhead of reading kafka if we want time
based lags?

On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
wrote:

> Adding on to John's response to 3), can you clarify when and why exactly we
> cannot
> convert between taskIds and partitions? If that's really the case I don't
> feel confident
> that the StreamsPartitionAssignor is not full of bugs...
>
> It seems like it currently just encodes a list of all partitions (the
> assignment) and also
> a list of the corresponding task ids, duplicated to ensure each partition
> has the corresponding
> taskId at the same offset into the list. Why is that problematic?
>
>
> On Fri, Nov 1, 2019 at 12:39 PM John Roesler  wrote:
>
> > Thanks, all, for considering the points!
> >
> > 3. Interesting. I have a vague recollection of that... Still, though,
> > it seems a little fishy. After all, we return the assignments
> > themselves as task ids, and the members have to map these to topic
> > partitions in order to configure themselves properly. If it's too
> > complicated to get this right, then how do we know that Streams is
> > computing the correct partitions at all?
> >
> > 4. How about just checking the log-end timestamp when you call the
> > method? Then, when you get an answer, it's as fresh as it could
> > possibly be. And as a user you have just one, obvious, "knob" to
> > configure how much overhead you want to devote to checking... If you
> > want to call the broker API less frequently, you just call the Streams
> > API less frequently. And you don't have to worry about the
> > relationship between your invocations of that method and the config
> > setting (e.g., you'll never get a negative number, which you could if
> > you check the log-end timestamp less frequently than you check the
> > lag).
> >
> > Thanks,
> > -John
> >
> > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> >  wrote:
> > >
> > > Thanks John for going through this.
> > >
> > >- +1, makes sense
> > >- +1, no issues there
> > >- Yeah the initial patch I had submitted for K-7149(
> > https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo
> > object had taskIds but the merged PR had similar size according to Vinoth
> > and it was simpler so if the end result is of same size, it would not
> make
> > sense to pivot from dictionary and again move to taskIDs.
> > >- Not sure about what a good default would be if we don't have a
> > configurable setting. This gives the users the flexibility to the users
> to
> > serve their requirements as at the end of the day it would take CPU
> cycles.
> > I am ok with starting it with a default and see how it goes based upon
> > feedback.
> > >
> > > Thanks,
> > > Navinder
> > > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> > >
> > >  1. Was trying to spell them out separately. but makes sense for
> > > readability. done
> > >
> > > 2. No I immediately agree :) .. makes sense. @navinder?
> > >
> > > 3. I actually attempted only sending taskIds while working on
> KAFKA-7149.
> > > Its non-trivial to handle edges cases resulting from newly added topic
> > > partitions and wildcarded topic entries. I ended up simplifying it to
> > just
> > > dictionary encoding the topic names to reduce size. We can apply the
> same
> > > technique here for this map. Additionally, we could also dictionary
> > encode
> > > HostInfo, given its now repeated twice. I think this would save more
> > space
> > > than having a flag per topic partition entry. Lmk if you are okay with
> > > this.
> > >
> > > 4. This opens up a good discussion. Given we support time lag estimates
> > > also, we need to read the tail record of the changelog periodically
> > (unlike
> > > offset lag, which we can potentially piggyback on metadata in
> > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-01 Thread Sophie Blee-Goldman
Adding on to John's response to 3), can you clarify when and why exactly we
cannot
convert between taskIds and partitions? If that's really the case I don't
feel confident
that the StreamsPartitionAssignor is not full of bugs...

It seems like it currently just encodes a list of all partitions (the
assignment) and also
a list of the corresponding task ids, duplicated to ensure each partition
has the corresponding
taskId at the same offset into the list. Why is that problematic?


On Fri, Nov 1, 2019 at 12:39 PM John Roesler  wrote:

> Thanks, all, for considering the points!
>
> 3. Interesting. I have a vague recollection of that... Still, though,
> it seems a little fishy. After all, we return the assignments
> themselves as task ids, and the members have to map these to topic
> partitions in order to configure themselves properly. If it's too
> complicated to get this right, then how do we know that Streams is
> computing the correct partitions at all?
>
> 4. How about just checking the log-end timestamp when you call the
> method? Then, when you get an answer, it's as fresh as it could
> possibly be. And as a user you have just one, obvious, "knob" to
> configure how much overhead you want to devote to checking... If you
> want to call the broker API less frequently, you just call the Streams
> API less frequently. And you don't have to worry about the
> relationship between your invocations of that method and the config
> setting (e.g., you'll never get a negative number, which you could if
> you check the log-end timestamp less frequently than you check the
> lag).
>
> Thanks,
> -John
>
> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
>  wrote:
> >
> > Thanks John for going through this.
> >
> >- +1, makes sense
> >- +1, no issues there
> >- Yeah the initial patch I had submitted for K-7149(
> https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo
> object had taskIds but the merged PR had similar size according to Vinoth
> and it was simpler so if the end result is of same size, it would not make
> sense to pivot from dictionary and again move to taskIDs.
> >- Not sure about what a good default would be if we don't have a
> configurable setting. This gives the users the flexibility to the users to
> serve their requirements as at the end of the day it would take CPU cycles.
> I am ok with starting it with a default and see how it goes based upon
> feedback.
> >
> > Thanks,
> > Navinder
> > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
> >
> >  1. Was trying to spell them out separately. but makes sense for
> > readability. done
> >
> > 2. No I immediately agree :) .. makes sense. @navinder?
> >
> > 3. I actually attempted only sending taskIds while working on KAFKA-7149.
> > Its non-trivial to handle edges cases resulting from newly added topic
> > partitions and wildcarded topic entries. I ended up simplifying it to
> just
> > dictionary encoding the topic names to reduce size. We can apply the same
> > technique here for this map. Additionally, we could also dictionary
> encode
> > HostInfo, given its now repeated twice. I think this would save more
> space
> > than having a flag per topic partition entry. Lmk if you are okay with
> > this.
> >
> > 4. This opens up a good discussion. Given we support time lag estimates
> > also, we need to read the tail record of the changelog periodically
> (unlike
> > offset lag, which we can potentially piggyback on metadata in
> > ConsumerRecord IIUC). we thought we should have a config that control how
> > often this read happens? Let me know if there is a simple way to get
> > timestamp value of the tail record that we are missing.
> >
> > On Thu, Oct 31, 2019 at 12:58 PM John Roesler  wrote:
> >
> > > Hey Navinder,
> > >
> > > Thanks for updating the KIP, it's a lot easier to see the current
> > > state of the proposal now.
> > >
> > > A few remarks:
> > > 1. I'm sure it was just an artifact of revisions, but you have two
> > > separate sections where you list additions to the KafkaStreams
> > > interface. Can you consolidate those so we can see all the additions
> > > at once?
> > >
> > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> > > to be clearer that we're specifically measuring a number of offsets?
> > > If you don't immediately agree, then I'd at least point out that we
> > > usually refer to elements of Kafka topics as "records", not
> > > "messages", so "recordLagEstimate" might be more appropriate.
> > >
> > > 3. The proposal mentions adding a map of the standby _partitions_ for
> > > each host to AssignmentInfo. I assume this is designed to mirror the
> > > existing "partitionsByHost" map. To keep the size of these metadata
> > > messages down, maybe we can consider making two changes:
> > > (a) for both actives and standbys, encode the _task ids_ instead of
> > > _partitions_. Every member of the cluster has a copy of the topology,
> > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-01 Thread John Roesler
Thanks, all, for considering the points!

3. Interesting. I have a vague recollection of that... Still, though,
it seems a little fishy. After all, we return the assignments
themselves as task ids, and the members have to map these to topic
partitions in order to configure themselves properly. If it's too
complicated to get this right, then how do we know that Streams is
computing the correct partitions at all?

4. How about just checking the log-end timestamp when you call the
method? Then, when you get an answer, it's as fresh as it could
possibly be. And as a user you have just one, obvious, "knob" to
configure how much overhead you want to devote to checking... If you
want to call the broker API less frequently, you just call the Streams
API less frequently. And you don't have to worry about the
relationship between your invocations of that method and the config
setting (e.g., you'll never get a negative number, which you could if
you check the log-end timestamp less frequently than you check the
lag).

Thanks,
-John

On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
 wrote:
>
> Thanks John for going through this.
>
>- +1, makes sense
>- +1, no issues there
>- Yeah the initial patch I had submitted for 
> K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo 
> object had taskIds but the merged PR had similar size according to Vinoth and 
> it was simpler so if the end result is of same size, it would not make sense 
> to pivot from dictionary and again move to taskIDs.
>- Not sure about what a good default would be if we don't have a 
> configurable setting. This gives the users the flexibility to the users to 
> serve their requirements as at the end of the day it would take CPU cycles. I 
> am ok with starting it with a default and see how it goes based upon feedback.
>
> Thanks,
> Navinder
> On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar 
>  wrote:
>
>  1. Was trying to spell them out separately. but makes sense for
> readability. done
>
> 2. No I immediately agree :) .. makes sense. @navinder?
>
> 3. I actually attempted only sending taskIds while working on KAFKA-7149.
> Its non-trivial to handle edges cases resulting from newly added topic
> partitions and wildcarded topic entries. I ended up simplifying it to just
> dictionary encoding the topic names to reduce size. We can apply the same
> technique here for this map. Additionally, we could also dictionary encode
> HostInfo, given its now repeated twice. I think this would save more space
> than having a flag per topic partition entry. Lmk if you are okay with
> this.
>
> 4. This opens up a good discussion. Given we support time lag estimates
> also, we need to read the tail record of the changelog periodically (unlike
> offset lag, which we can potentially piggyback on metadata in
> ConsumerRecord IIUC). we thought we should have a config that control how
> often this read happens? Let me know if there is a simple way to get
> timestamp value of the tail record that we are missing.
>
> On Thu, Oct 31, 2019 at 12:58 PM John Roesler  wrote:
>
> > Hey Navinder,
> >
> > Thanks for updating the KIP, it's a lot easier to see the current
> > state of the proposal now.
> >
> > A few remarks:
> > 1. I'm sure it was just an artifact of revisions, but you have two
> > separate sections where you list additions to the KafkaStreams
> > interface. Can you consolidate those so we can see all the additions
> > at once?
> >
> > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> > to be clearer that we're specifically measuring a number of offsets?
> > If you don't immediately agree, then I'd at least point out that we
> > usually refer to elements of Kafka topics as "records", not
> > "messages", so "recordLagEstimate" might be more appropriate.
> >
> > 3. The proposal mentions adding a map of the standby _partitions_ for
> > each host to AssignmentInfo. I assume this is designed to mirror the
> > existing "partitionsByHost" map. To keep the size of these metadata
> > messages down, maybe we can consider making two changes:
> > (a) for both actives and standbys, encode the _task ids_ instead of
> > _partitions_. Every member of the cluster has a copy of the topology,
> > so they can convert task ids into specific partitions on their own,
> > and task ids are only (usually) three characters.
> > (b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
> > standbys), which requires serializing all the hostinfos twice, maybe
> > we can pack them together in one map with a structured value (hostinfo
> > -> [actives,standbys]).
> > Both of these ideas still require bumping the protocol version to 6,
> > and they basically mean we drop the existing `PartitionsByHost` field
> > and add a new `TasksByHost` field with the structured value I
> > mentioned.
> >
> > 4. Can we avoid adding the new "lag refresh" config? The lags would
> > necessarily be approximate anyway, so adding the 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Navinder Brar
Thanks John for going through this.
   
   - +1, makes sense
   - +1, no issues there
   - Yeah the initial patch I had submitted for 
K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo 
object had taskIds but the merged PR had similar size according to Vinoth and 
it was simpler so if the end result is of same size, it would not make sense to 
pivot from dictionary and again move to taskIDs.
   - Not sure about what a good default would be if we don't have a 
configurable setting. This gives the users the flexibility to the users to 
serve their requirements as at the end of the day it would take CPU cycles. I 
am ok with starting it with a default and see how it goes based upon feedback.

Thanks,
Navinder
On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar 
 wrote:  
 
 1. Was trying to spell them out separately. but makes sense for
readability. done

2. No I immediately agree :) .. makes sense. @navinder?

3. I actually attempted only sending taskIds while working on KAFKA-7149.
Its non-trivial to handle edges cases resulting from newly added topic
partitions and wildcarded topic entries. I ended up simplifying it to just
dictionary encoding the topic names to reduce size. We can apply the same
technique here for this map. Additionally, we could also dictionary encode
HostInfo, given its now repeated twice. I think this would save more space
than having a flag per topic partition entry. Lmk if you are okay with
this.

4. This opens up a good discussion. Given we support time lag estimates
also, we need to read the tail record of the changelog periodically (unlike
offset lag, which we can potentially piggyback on metadata in
ConsumerRecord IIUC). we thought we should have a config that control how
often this read happens? Let me know if there is a simple way to get
timestamp value of the tail record that we are missing.

On Thu, Oct 31, 2019 at 12:58 PM John Roesler  wrote:

> Hey Navinder,
>
> Thanks for updating the KIP, it's a lot easier to see the current
> state of the proposal now.
>
> A few remarks:
> 1. I'm sure it was just an artifact of revisions, but you have two
> separate sections where you list additions to the KafkaStreams
> interface. Can you consolidate those so we can see all the additions
> at once?
>
> 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> to be clearer that we're specifically measuring a number of offsets?
> If you don't immediately agree, then I'd at least point out that we
> usually refer to elements of Kafka topics as "records", not
> "messages", so "recordLagEstimate" might be more appropriate.
>
> 3. The proposal mentions adding a map of the standby _partitions_ for
> each host to AssignmentInfo. I assume this is designed to mirror the
> existing "partitionsByHost" map. To keep the size of these metadata
> messages down, maybe we can consider making two changes:
> (a) for both actives and standbys, encode the _task ids_ instead of
> _partitions_. Every member of the cluster has a copy of the topology,
> so they can convert task ids into specific partitions on their own,
> and task ids are only (usually) three characters.
> (b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
> standbys), which requires serializing all the hostinfos twice, maybe
> we can pack them together in one map with a structured value (hostinfo
> -> [actives,standbys]).
> Both of these ideas still require bumping the protocol version to 6,
> and they basically mean we drop the existing `PartitionsByHost` field
> and add a new `TasksByHost` field with the structured value I
> mentioned.
>
> 4. Can we avoid adding the new "lag refresh" config? The lags would
> necessarily be approximate anyway, so adding the config seems to
> increase the operational complexity of the system for little actual
> benefit.
>
> Thanks for the pseudocode, by the way, it really helps visualize how
> these new interfaces would play together. And thanks again for the
> update!
> -John
>
> On Thu, Oct 31, 2019 at 2:41 PM John Roesler  wrote:
> >
> > Hey Vinoth,
> >
> > I started going over the KIP again yesterday. There are a lot of
> > updates, and I didn't finish my feedback in one day. I'm working on it
> > now.
> >
> > Thanks,
> > John
> >
> > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar 
> wrote:
> > >
> > > Wondering if anyone has thoughts on these changes? I liked that the new
> > > metadata fetch APIs provide all the information at once with consistent
> > > naming..
> > >
> > > Any guidance on what you would like to be discussed or fleshed out more
> > > before we call a VOTE?
> > >
> > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> > >  wrote:
> > >
> > > > Hi,
> > > > We have made some edits in the KIP(
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> )
> > > > after due deliberation on the agreed design to support the new query
> > > > design. This 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Vinoth Chandar
1. Was trying to spell them out separately. but makes sense for
readability. done

2. No I immediately agree :) .. makes sense. @navinder?

3. I actually attempted only sending taskIds while working on KAFKA-7149.
Its non-trivial to handle edges cases resulting from newly added topic
partitions and wildcarded topic entries. I ended up simplifying it to just
dictionary encoding the topic names to reduce size. We can apply the same
technique here for this map. Additionally, we could also dictionary encode
HostInfo, given its now repeated twice. I think this would save more space
than having a flag per topic partition entry. Lmk if you are okay with
this.

4. This opens up a good discussion. Given we support time lag estimates
also, we need to read the tail record of the changelog periodically (unlike
offset lag, which we can potentially piggyback on metadata in
ConsumerRecord IIUC). we thought we should have a config that control how
often this read happens? Let me know if there is a simple way to get
timestamp value of the tail record that we are missing.

On Thu, Oct 31, 2019 at 12:58 PM John Roesler  wrote:

> Hey Navinder,
>
> Thanks for updating the KIP, it's a lot easier to see the current
> state of the proposal now.
>
> A few remarks:
> 1. I'm sure it was just an artifact of revisions, but you have two
> separate sections where you list additions to the KafkaStreams
> interface. Can you consolidate those so we can see all the additions
> at once?
>
> 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> to be clearer that we're specifically measuring a number of offsets?
> If you don't immediately agree, then I'd at least point out that we
> usually refer to elements of Kafka topics as "records", not
> "messages", so "recordLagEstimate" might be more appropriate.
>
> 3. The proposal mentions adding a map of the standby _partitions_ for
> each host to AssignmentInfo. I assume this is designed to mirror the
> existing "partitionsByHost" map. To keep the size of these metadata
> messages down, maybe we can consider making two changes:
> (a) for both actives and standbys, encode the _task ids_ instead of
> _partitions_. Every member of the cluster has a copy of the topology,
> so they can convert task ids into specific partitions on their own,
> and task ids are only (usually) three characters.
> (b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
> standbys), which requires serializing all the hostinfos twice, maybe
> we can pack them together in one map with a structured value (hostinfo
> -> [actives,standbys]).
> Both of these ideas still require bumping the protocol version to 6,
> and they basically mean we drop the existing `PartitionsByHost` field
> and add a new `TasksByHost` field with the structured value I
> mentioned.
>
> 4. Can we avoid adding the new "lag refresh" config? The lags would
> necessarily be approximate anyway, so adding the config seems to
> increase the operational complexity of the system for little actual
> benefit.
>
> Thanks for the pseudocode, by the way, it really helps visualize how
> these new interfaces would play together. And thanks again for the
> update!
> -John
>
> On Thu, Oct 31, 2019 at 2:41 PM John Roesler  wrote:
> >
> > Hey Vinoth,
> >
> > I started going over the KIP again yesterday. There are a lot of
> > updates, and I didn't finish my feedback in one day. I'm working on it
> > now.
> >
> > Thanks,
> > John
> >
> > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar 
> wrote:
> > >
> > > Wondering if anyone has thoughts on these changes? I liked that the new
> > > metadata fetch APIs provide all the information at once with consistent
> > > naming..
> > >
> > > Any guidance on what you would like to be discussed or fleshed out more
> > > before we call a VOTE?
> > >
> > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> > >  wrote:
> > >
> > > > Hi,
> > > > We have made some edits in the KIP(
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> )
> > > > after due deliberation on the agreed design to support the new query
> > > > design. This includes the new public API to query offset/time lag
> > > > information and other details related to querying standby tasks
> which have
> > > > come up after thinking of thorough details.
> > > >
> > > >
> > > >
> > > >- Addition of new config, “lag.fetch.interval.ms” to configure
> the
> > > > interval of time/offset lag
> > > >- Addition of new class StoreLagInfo to store the periodically
> obtained
> > > > time/offset lag
> > > >- Addition of two new functions in KafkaStreams,
> List
> > > > allLagInfo() and List lagInfoForStore(String
> storeName) to
> > > > return the lag information for an instance and a store respectively
> > > >- Addition of new class KeyQueryMetadata. We need topicPartition
> for
> > > > each key to be matched with the lag API for the topic partition. One
> way is
> 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread John Roesler
Hey Navinder,

Thanks for updating the KIP, it's a lot easier to see the current
state of the proposal now.

A few remarks:
1. I'm sure it was just an artifact of revisions, but you have two
separate sections where you list additions to the KafkaStreams
interface. Can you consolidate those so we can see all the additions
at once?

2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
to be clearer that we're specifically measuring a number of offsets?
If you don't immediately agree, then I'd at least point out that we
usually refer to elements of Kafka topics as "records", not
"messages", so "recordLagEstimate" might be more appropriate.

3. The proposal mentions adding a map of the standby _partitions_ for
each host to AssignmentInfo. I assume this is designed to mirror the
existing "partitionsByHost" map. To keep the size of these metadata
messages down, maybe we can consider making two changes:
(a) for both actives and standbys, encode the _task ids_ instead of
_partitions_. Every member of the cluster has a copy of the topology,
so they can convert task ids into specific partitions on their own,
and task ids are only (usually) three characters.
(b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
standbys), which requires serializing all the hostinfos twice, maybe
we can pack them together in one map with a structured value (hostinfo
-> [actives,standbys]).
Both of these ideas still require bumping the protocol version to 6,
and they basically mean we drop the existing `PartitionsByHost` field
and add a new `TasksByHost` field with the structured value I
mentioned.

4. Can we avoid adding the new "lag refresh" config? The lags would
necessarily be approximate anyway, so adding the config seems to
increase the operational complexity of the system for little actual
benefit.

Thanks for the pseudocode, by the way, it really helps visualize how
these new interfaces would play together. And thanks again for the
update!
-John

On Thu, Oct 31, 2019 at 2:41 PM John Roesler  wrote:
>
> Hey Vinoth,
>
> I started going over the KIP again yesterday. There are a lot of
> updates, and I didn't finish my feedback in one day. I'm working on it
> now.
>
> Thanks,
> John
>
> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar  wrote:
> >
> > Wondering if anyone has thoughts on these changes? I liked that the new
> > metadata fetch APIs provide all the information at once with consistent
> > naming..
> >
> > Any guidance on what you would like to be discussed or fleshed out more
> > before we call a VOTE?
> >
> > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> >  wrote:
> >
> > > Hi,
> > > We have made some edits in the KIP(
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
> > > after due deliberation on the agreed design to support the new query
> > > design. This includes the new public API to query offset/time lag
> > > information and other details related to querying standby tasks which have
> > > come up after thinking of thorough details.
> > >
> > >
> > >
> > >- Addition of new config, “lag.fetch.interval.ms” to configure the
> > > interval of time/offset lag
> > >- Addition of new class StoreLagInfo to store the periodically obtained
> > > time/offset lag
> > >- Addition of two new functions in KafkaStreams, List
> > > allLagInfo() and List lagInfoForStore(String storeName) to
> > > return the lag information for an instance and a store respectively
> > >- Addition of new class KeyQueryMetadata. We need topicPartition for
> > > each key to be matched with the lag API for the topic partition. One way 
> > > is
> > > to add new functions and fetch topicPartition from StreamsMetadataState 
> > > but
> > > we thought having one call and fetching StreamsMetadata and topicPartition
> > > is more cleaner.
> > >-
> > > Renaming partitionsForHost to activePartitionsForHost in 
> > > StreamsMetadataState
> > > and partitionsByHostState to activePartitionsByHostState
> > > in StreamsPartitionAssignor
> > >- We have also added the pseudo code of how all the changes will exist
> > > together and support the new querying APIs
> > >
> > > Please let me know if anything is pending now, before a vote can be
> > > started on this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, 
> > > Navinder
> > > Brar  wrote:
> > >
> > >  >> Since there are two soft votes for separate active/standby API
> > > methods, I also change my position on that. Fine with 2 separate
> > > methods. Once we remove the lag information from these APIs, returning a
> > > List is less attractive, since the ordering has no special meaning now.
> > > Agreed, now that we are not returning lag, I am also sold on having two
> > > separate functions. We already have one which returns streamsMetadata for
> > > active tasks, and now we can add another one for standbys.
> > >
> > >
> > >
> > > On Saturday, 26 October, 2019, 03:55:16 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread John Roesler
Hey Vinoth,

I started going over the KIP again yesterday. There are a lot of
updates, and I didn't finish my feedback in one day. I'm working on it
now.

Thanks,
John

On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar  wrote:
>
> Wondering if anyone has thoughts on these changes? I liked that the new
> metadata fetch APIs provide all the information at once with consistent
> naming..
>
> Any guidance on what you would like to be discussed or fleshed out more
> before we call a VOTE?
>
> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
>  wrote:
>
> > Hi,
> > We have made some edits in the KIP(
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
> > after due deliberation on the agreed design to support the new query
> > design. This includes the new public API to query offset/time lag
> > information and other details related to querying standby tasks which have
> > come up after thinking of thorough details.
> >
> >
> >
> >- Addition of new config, “lag.fetch.interval.ms” to configure the
> > interval of time/offset lag
> >- Addition of new class StoreLagInfo to store the periodically obtained
> > time/offset lag
> >- Addition of two new functions in KafkaStreams, List
> > allLagInfo() and List lagInfoForStore(String storeName) to
> > return the lag information for an instance and a store respectively
> >- Addition of new class KeyQueryMetadata. We need topicPartition for
> > each key to be matched with the lag API for the topic partition. One way is
> > to add new functions and fetch topicPartition from StreamsMetadataState but
> > we thought having one call and fetching StreamsMetadata and topicPartition
> > is more cleaner.
> >-
> > Renaming partitionsForHost to activePartitionsForHost in 
> > StreamsMetadataState
> > and partitionsByHostState to activePartitionsByHostState
> > in StreamsPartitionAssignor
> >- We have also added the pseudo code of how all the changes will exist
> > together and support the new querying APIs
> >
> > Please let me know if anything is pending now, before a vote can be
> > started on this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder
> > Brar  wrote:
> >
> >  >> Since there are two soft votes for separate active/standby API
> > methods, I also change my position on that. Fine with 2 separate
> > methods. Once we remove the lag information from these APIs, returning a
> > List is less attractive, since the ordering has no special meaning now.
> > Agreed, now that we are not returning lag, I am also sold on having two
> > separate functions. We already have one which returns streamsMetadata for
> > active tasks, and now we can add another one for standbys.
> >
> >
> >
> > On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> >
> >  +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
> > good and makes for a more complete API.
> >
> > Since there are two soft votes for separate active/standby API methods, I
> > also change my position on that. Fine with 2 separate methods.
> > Once we remove the lag information from these APIs, returning a List is
> > less attractive, since the ordering has no special meaning now.
> >
> > >> lag in offsets vs time: Having both, as suggested by Sophie would of
> > course be best. What is a little unclear to me is, how in details are we
> > going to compute both?
> > @navinder may be next step is to flesh out these details and surface any
> > larger changes we need to make if need be.
> >
> > Any other details we need to cover, before a VOTE can be called on this?
> >
> >
> > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:
> >
> > > I am jumping in a little late here.
> > >
> > > Overall I agree with the proposal to push decision making on what/how to
> > > query in the query layer.
> > >
> > > For point 5 from above, I'm slightly in favor of having a new method,
> > > "standbyMetadataForKey()" or something similar.
> > > Because even if we return all tasks in one list, the user will still have
> > > to perform some filtering to separate the different tasks, so I don't
> > feel
> > > making two calls is a burden, and IMHO makes things more transparent for
> > > the user.
> > > If the final vote is for using an "isActive" field, I'm good with that as
> > > well.
> > >
> > > Just my 2 cents.
> > >
> > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> > >  wrote:
> > >
> > > > I think now we are aligned on almost all the design parts. Summarising
> > > > below what has been discussed above and we have a general consensus on.
> > > >
> > > >
> > > >- Rather than broadcasting lag across all nodes at rebalancing/with
> > > the
> > > > heartbeat, we will just return a list of all available standby’s in the
> > > > system and the user can make IQ query any of those nodes which will
> > > return
> > > > the response, and the lag and offset time. Based on which user 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Vinoth Chandar
Wondering if anyone has thoughts on these changes? I liked that the new
metadata fetch APIs provide all the information at once with consistent
naming..

Any guidance on what you would like to be discussed or fleshed out more
before we call a VOTE?

On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
 wrote:

> Hi,
> We have made some edits in the KIP(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
> after due deliberation on the agreed design to support the new query
> design. This includes the new public API to query offset/time lag
> information and other details related to querying standby tasks which have
> come up after thinking of thorough details.
>
>
>
>- Addition of new config, “lag.fetch.interval.ms” to configure the
> interval of time/offset lag
>- Addition of new class StoreLagInfo to store the periodically obtained
> time/offset lag
>- Addition of two new functions in KafkaStreams, List
> allLagInfo() and List lagInfoForStore(String storeName) to
> return the lag information for an instance and a store respectively
>- Addition of new class KeyQueryMetadata. We need topicPartition for
> each key to be matched with the lag API for the topic partition. One way is
> to add new functions and fetch topicPartition from StreamsMetadataState but
> we thought having one call and fetching StreamsMetadata and topicPartition
> is more cleaner.
>-
> Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState
> and partitionsByHostState to activePartitionsByHostState
> in StreamsPartitionAssignor
>- We have also added the pseudo code of how all the changes will exist
> together and support the new querying APIs
>
> Please let me know if anything is pending now, before a vote can be
> started on this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder
> Brar  wrote:
>
>  >> Since there are two soft votes for separate active/standby API
> methods, I also change my position on that. Fine with 2 separate
> methods. Once we remove the lag information from these APIs, returning a
> List is less attractive, since the ordering has no special meaning now.
> Agreed, now that we are not returning lag, I am also sold on having two
> separate functions. We already have one which returns streamsMetadata for
> active tasks, and now we can add another one for standbys.
>
>
>
> On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
> good and makes for a more complete API.
>
> Since there are two soft votes for separate active/standby API methods, I
> also change my position on that. Fine with 2 separate methods.
> Once we remove the lag information from these APIs, returning a List is
> less attractive, since the ordering has no special meaning now.
>
> >> lag in offsets vs time: Having both, as suggested by Sophie would of
> course be best. What is a little unclear to me is, how in details are we
> going to compute both?
> @navinder may be next step is to flesh out these details and surface any
> larger changes we need to make if need be.
>
> Any other details we need to cover, before a VOTE can be called on this?
>
>
> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:
>
> > I am jumping in a little late here.
> >
> > Overall I agree with the proposal to push decision making on what/how to
> > query in the query layer.
> >
> > For point 5 from above, I'm slightly in favor of having a new method,
> > "standbyMetadataForKey()" or something similar.
> > Because even if we return all tasks in one list, the user will still have
> > to perform some filtering to separate the different tasks, so I don't
> feel
> > making two calls is a burden, and IMHO makes things more transparent for
> > the user.
> > If the final vote is for using an "isActive" field, I'm good with that as
> > well.
> >
> > Just my 2 cents.
> >
> > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> >  wrote:
> >
> > > I think now we are aligned on almost all the design parts. Summarising
> > > below what has been discussed above and we have a general consensus on.
> > >
> > >
> > >- Rather than broadcasting lag across all nodes at rebalancing/with
> > the
> > > heartbeat, we will just return a list of all available standby’s in the
> > > system and the user can make IQ query any of those nodes which will
> > return
> > > the response, and the lag and offset time. Based on which user can
> decide
> > > if he wants to return the response back or call another standby.
> > >-  The current metadata query frequency will not change. It will be
> > the
> > > same as it does now, i.e. before each query.
> > >
> > >-  For fetching list in StreamsMetadataState.java
> and
> > > List in StreamThreadStateStoreProvider.java
> > (which
> > > will return all active stores which are running/restoring and replica
> > > stores 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-30 Thread Navinder Brar
Hi,
We have made some edits in the 
KIP(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
 after due deliberation on the agreed design to support the new query design. 
This includes the new public API to query offset/time lag information and other 
details related to querying standby tasks which have come up after thinking of 
thorough details. 


   
   - Addition of new config, “lag.fetch.interval.ms” to configure the interval 
of time/offset lag
   - Addition of new class StoreLagInfo to store the periodically obtained 
time/offset lag 
   - Addition of two new functions in KafkaStreams, List 
allLagInfo() and List lagInfoForStore(String storeName) to return 
the lag information for an instance and a store respectively
   - Addition of new class KeyQueryMetadata. We need topicPartition for each 
key to be matched with the lag API for the topic partition. One way is to add 
new functions and fetch topicPartition from StreamsMetadataState but we thought 
having one call and fetching StreamsMetadata and topicPartition is more cleaner.
   - Renaming partitionsForHost to activePartitionsForHost in 
StreamsMetadataState and partitionsByHostState to activePartitionsByHostState 
in StreamsPartitionAssignor
   - We have also added the pseudo code of how all the changes will exist 
together and support the new querying APIs

Please let me know if anything is pending now, before a vote can be started on 
this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder Brar 
 wrote:  
 
 >> Since there are two soft votes for separate active/standby API methods, I 
 >>also change my position on that. Fine with 2 separate methods. Once we 
 >>remove the lag information from these APIs, returning a List is less 
 >>attractive, since the ordering has no special meaning now.
Agreed, now that we are not returning lag, I am also sold on having two 
separate functions. We already have one which returns streamsMetadata for 
active tasks, and now we can add another one for standbys.



    On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar 
 wrote:  
 
 +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >    - Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >    -  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >    -  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >    - There is no need to add new StreamsConfig for implementing this KIP
> >
> >    - We will add standbyPartitionsByHost in AssignmentInfo and
> > StreamsMetadataState which would change the existing rebuildMetadata()
> and
> > setPartitionsByHostState()
> >
> >
> >
> > If anyone has any more concerns please feel free to add. Post this I will
> > be initiating a vote.
> > ~Navinder
> >

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-26 Thread Navinder Brar
>> Since there are two soft votes for separate active/standby API methods, I 
>>also change my position on that. Fine with 2 separate methods. Once we remove 
>>the lag information from these APIs, returning a List is less attractive, 
>>since the ordering has no special meaning now.
Agreed, now that we are not returning lag, I am also sold on having two 
separate functions. We already have one which returns streamsMetadata for 
active tasks, and now we can add another one for standbys.



On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar 
 wrote:  
 
 +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >    - Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >    -  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >    -  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >    - There is no need to add new StreamsConfig for implementing this KIP
> >
> >    - We will add standbyPartitionsByHost in AssignmentInfo and
> > StreamsMetadataState which would change the existing rebuildMetadata()
> and
> > setPartitionsByHostState()
> >
> >
> >
> > If anyone has any more concerns please feel free to add. Post this I will
> > be initiating a vote.
> > ~Navinder
> >
> >    On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> >
> >  Just to close the loop @Vinoth:
> >
> > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > information
> > > to AssignmentInfo, which gets sent to every participant.
> >
> > As explained by John, currently KIP-441 plans to only report the
> > information to the leader. But I guess, with the new proposal to not
> > broadcast this information anyway, this concern is invalidated anyway
> >
> > > 2. At-least I was under the assumption that it can be called per query,
> > > since the API docs don't seem to suggest otherwise. Do you see any
> > > potential issues if we call this every query? (we should benchmark this
> > > nonetheless)
> >
> > I did not see a real issue if people refresh the metadata frequently,
> > because it would be a local call. My main point was, that this would
> > change the current usage pattern of the API, and we would clearly need
> > to communicate this change. Similar to (1), this concern in invalidated
> > anyway.
> >
> >
> > @John: I think it's a great idea to get rid of reporting lag, and
> > pushing the decision making process about "what to query" into the query
> > serving layer itself. This simplifies the overall design of this KIP
> > significantly, and actually aligns very well with the idea that Kafka
> > Streams (as it is a library) should only provide the basic building
> > block. Many of my raised questions are invalided by this.
> >
> >
> >
> > Some questions are still open though:
> 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Vinoth Chandar
+1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >- Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >-  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >-  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >- There is no need to add new StreamsConfig for implementing this KIP
> >
> >- We will add standbyPartitionsByHost in AssignmentInfo and
> > StreamsMetadataState which would change the existing rebuildMetadata()
> and
> > setPartitionsByHostState()
> >
> >
> >
> > If anyone has any more concerns please feel free to add. Post this I will
> > be initiating a vote.
> > ~Navinder
> >
> > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> >
> >  Just to close the loop @Vinoth:
> >
> > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > information
> > > to AssignmentInfo, which gets sent to every participant.
> >
> > As explained by John, currently KIP-441 plans to only report the
> > information to the leader. But I guess, with the new proposal to not
> > broadcast this information anyway, this concern is invalidated anyway
> >
> > > 2. At-least I was under the assumption that it can be called per query,
> > > since the API docs don't seem to suggest otherwise. Do you see any
> > > potential issues if we call this every query? (we should benchmark this
> > > nonetheless)
> >
> > I did not see a real issue if people refresh the metadata frequently,
> > because it would be a local call. My main point was, that this would
> > change the current usage pattern of the API, and we would clearly need
> > to communicate this change. Similar to (1), this concern in invalidated
> > anyway.
> >
> >
> > @John: I think it's a great idea to get rid of reporting lag, and
> > pushing the decision making process about "what to query" into the query
> > serving layer itself. This simplifies the overall design of this KIP
> > significantly, and actually aligns very well with the idea that Kafka
> > Streams (as it is a library) should only provide the basic building
> > block. Many of my raised questions are invalided by this.
> >
> >
> >
> > Some questions are still open though:
> >
> > > 10) Do we need to distinguish between active(restoring) and standby
> > > tasks? Or could be treat both as the same?
> >
> >
> > @Vinoth: about (5). I see your point about multiple calls vs a single
> > call. I still slightly prefer multiple calls, but it's highly subjective
> > and I would also be fine to add an #isActive() method. Would be good the
> > get feedback from others.
> >
> >
> > For (14), ie, lag in offsets vs time: Having both, as suggested by
> > Sophie would of course be best. What is a little unclear to me is, how
> > in details are we going to 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Bill Bejeck
I am jumping in a little late here.

Overall I agree with the proposal to push decision making on what/how to
query in the query layer.

For point 5 from above, I'm slightly in favor of having a new method,
"standbyMetadataForKey()" or something similar.
Because even if we return all tasks in one list, the user will still have
to perform some filtering to separate the different tasks, so I don't feel
making two calls is a burden, and IMHO makes things more transparent for
the user.
If the final vote is for using an "isActive" field, I'm good with that as
well.

Just my 2 cents.

On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
 wrote:

> I think now we are aligned on almost all the design parts. Summarising
> below what has been discussed above and we have a general consensus on.
>
>
>- Rather than broadcasting lag across all nodes at rebalancing/with the
> heartbeat, we will just return a list of all available standby’s in the
> system and the user can make IQ query any of those nodes which will return
> the response, and the lag and offset time. Based on which user can decide
> if he wants to return the response back or call another standby.
>-  The current metadata query frequency will not change. It will be the
> same as it does now, i.e. before each query.
>
>-  For fetching list in StreamsMetadataState.java and
> List in StreamThreadStateStoreProvider.java (which
> will return all active stores which are running/restoring and replica
> stores which are running), we will add new functions and not disturb the
> existing functions
>
>- There is no need to add new StreamsConfig for implementing this KIP
>
>- We will add standbyPartitionsByHost in AssignmentInfo and
> StreamsMetadataState which would change the existing rebuildMetadata() and
> setPartitionsByHostState()
>
>
>
> If anyone has any more concerns please feel free to add. Post this I will
> be initiating a vote.
> ~Navinder
>
> On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> matth...@confluent.io> wrote:
>
>  Just to close the loop @Vinoth:
>
> > 1. IIUC John intends to add (or we can do this in this KIP) lag
> information
> > to AssignmentInfo, which gets sent to every participant.
>
> As explained by John, currently KIP-441 plans to only report the
> information to the leader. But I guess, with the new proposal to not
> broadcast this information anyway, this concern is invalidated anyway
>
> > 2. At-least I was under the assumption that it can be called per query,
> > since the API docs don't seem to suggest otherwise. Do you see any
> > potential issues if we call this every query? (we should benchmark this
> > nonetheless)
>
> I did not see a real issue if people refresh the metadata frequently,
> because it would be a local call. My main point was, that this would
> change the current usage pattern of the API, and we would clearly need
> to communicate this change. Similar to (1), this concern in invalidated
> anyway.
>
>
> @John: I think it's a great idea to get rid of reporting lag, and
> pushing the decision making process about "what to query" into the query
> serving layer itself. This simplifies the overall design of this KIP
> significantly, and actually aligns very well with the idea that Kafka
> Streams (as it is a library) should only provide the basic building
> block. Many of my raised questions are invalided by this.
>
>
>
> Some questions are still open though:
>
> > 10) Do we need to distinguish between active(restoring) and standby
> > tasks? Or could be treat both as the same?
>
>
> @Vinoth: about (5). I see your point about multiple calls vs a single
> call. I still slightly prefer multiple calls, but it's highly subjective
> and I would also be fine to add an #isActive() method. Would be good the
> get feedback from others.
>
>
> For (14), ie, lag in offsets vs time: Having both, as suggested by
> Sophie would of course be best. What is a little unclear to me is, how
> in details are we going to compute both?
>
>
>
> -Matthias
>
>
>
> On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> > Just to chime in on the "report lag vs timestamp difference" issue, I
> would
> > actually advocate for both. As mentioned already, time difference is
> > probably a lot easier and/or more useful to reason about in terms of
> > "freshness"
> > of the state. But in the case when all queried stores are far behind, lag
> > could
> > be used to estimate the recovery velocity. You can then get a (pretty
> rough)
> > idea of when a store might be ready, and wait until around then to query
> > again.
> >
> > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang 
> wrote:
> >
> >> I think I agree with John's recent reasoning as well: instead of letting
> >> the storeMetadataAPI to return the staleness information, letting the
> >> client to query either active or standby and letting standby query
> response
> >> to include both the values + timestamp (or lag, as in diffs of
> timestamps)
> >> would actually be 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
I think now we are aligned on almost all the design parts. Summarising below 
what has been discussed above and we have a general consensus on.

   
   - Rather than broadcasting lag across all nodes at rebalancing/with the 
heartbeat, we will just return a list of all available standby’s in the system 
and the user can make IQ query any of those nodes which will return the 
response, and the lag and offset time. Based on which user can decide if he 
wants to return the response back or call another standby.
   -  The current metadata query frequency will not change. It will be the same 
as it does now, i.e. before each query.   

   -  For fetching list in StreamsMetadataState.java and 
List in StreamThreadStateStoreProvider.java (which will 
return all active stores which are running/restoring and replica stores which 
are running), we will add new functions and not disturb the existing functions  
 

   - There is no need to add new StreamsConfig for implementing this KIP   

   - We will add standbyPartitionsByHost in AssignmentInfo and 
StreamsMetadataState which would change the existing rebuildMetadata() and 
setPartitionsByHostState()   



If anyone has any more concerns please feel free to add. Post this I will be 
initiating a vote.
~Navinder

On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax 
 wrote:  
 
 Just to close the loop @Vinoth:

> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.

As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal to not
broadcast this information anyway, this concern is invalidated anyway

> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)

I did not see a real issue if people refresh the metadata frequently,
because it would be a local call. My main point was, that this would
change the current usage pattern of the API, and we would clearly need
to communicate this change. Similar to (1), this concern in invalidated
anyway.


@John: I think it's a great idea to get rid of reporting lag, and
pushing the decision making process about "what to query" into the query
serving layer itself. This simplifies the overall design of this KIP
significantly, and actually aligns very well with the idea that Kafka
Streams (as it is a library) should only provide the basic building
block. Many of my raised questions are invalided by this.



Some questions are still open though:

> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?


@Vinoth: about (5). I see your point about multiple calls vs a single
call. I still slightly prefer multiple calls, but it's highly subjective
and I would also be fine to add an #isActive() method. Would be good the
get feedback from others.


For (14), ie, lag in offsets vs time: Having both, as suggested by
Sophie would of course be best. What is a little unclear to me is, how
in details are we going to compute both?



-Matthias



On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> Just to chime in on the "report lag vs timestamp difference" issue, I would
> actually advocate for both. As mentioned already, time difference is
> probably a lot easier and/or more useful to reason about in terms of
> "freshness"
> of the state. But in the case when all queried stores are far behind, lag
> could
> be used to estimate the recovery velocity. You can then get a (pretty rough)
> idea of when a store might be ready, and wait until around then to query
> again.
> 
> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:
> 
>> I think I agree with John's recent reasoning as well: instead of letting
>> the storeMetadataAPI to return the staleness information, letting the
>> client to query either active or standby and letting standby query response
>> to include both the values + timestamp (or lag, as in diffs of timestamps)
>> would actually be more intuitive -- not only the streams client is simpler,
>> from user's perspective they also do not need to periodically refresh their
>> staleness information from the client, but only need to make decisions on
>> the fly whenever they need to query.
>>
>> Again the standby replica then need to know the current active task's
>> timestamp, which can be found from the log end record's encoded timestamp;
>> today we standby tasks do not read that specific record, but only refresh
>> its knowledge on the log end offset, but I think refreshing the latest
>> record timestamp is not a very bad request to add on the standby replicas.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
>> wrote:
>>
>>> +1 As someone implementing a query routing layer, there is already a need
>>> to have 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
 I think now we are aligned on almost all the design parts. Summarising below 
what has been discussed above and we have a general consensus on.
1. Rather than broadcasting lag across all nodes at rebalancing/with the 
heartbeat, we will just return a list of all available standby’s in the system 
and the user can make IQ query any of those nodes which will return the 
response, and the lag and offset time. Based on which user can decide if he 
wants to return the response back or call another standby.2. The current 
metadata query frequency will not change. It will be the same as it does now, 
i.e. before each query.3. For fetching list in 
StreamsMetadataState.java and List in 
StreamThreadStateStoreProvider.java (which will return all active stores which 
are running/restoring and replica stores which are running), we will add new 
functions and not disturb the existing functions4. There is no need to add new 
StreamsConfig for implementing this KIP5. We will add standbyPartitionsByHost 
in AssignmentInfo and StreamsMetadataState which would change the existing 
rebuildMetadata() and setPartitionsByHostState()

If anyone has any more concerns please feel free to add. Post this I will be 
initiating a vote.
~Navinder
On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax 
 wrote:  
 
 Just to close the loop @Vinoth:

> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.

As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal to not
broadcast this information anyway, this concern is invalidated anyway

> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)

I did not see a real issue if people refresh the metadata frequently,
because it would be a local call. My main point was, that this would
change the current usage pattern of the API, and we would clearly need
to communicate this change. Similar to (1), this concern in invalidated
anyway.


@John: I think it's a great idea to get rid of reporting lag, and
pushing the decision making process about "what to query" into the query
serving layer itself. This simplifies the overall design of this KIP
significantly, and actually aligns very well with the idea that Kafka
Streams (as it is a library) should only provide the basic building
block. Many of my raised questions are invalided by this.



Some questions are still open though:

> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?


@Vinoth: about (5). I see your point about multiple calls vs a single
call. I still slightly prefer multiple calls, but it's highly subjective
and I would also be fine to add an #isActive() method. Would be good the
get feedback from others.


For (14), ie, lag in offsets vs time: Having both, as suggested by
Sophie would of course be best. What is a little unclear to me is, how
in details are we going to compute both?



-Matthias



On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> Just to chime in on the "report lag vs timestamp difference" issue, I would
> actually advocate for both. As mentioned already, time difference is
> probably a lot easier and/or more useful to reason about in terms of
> "freshness"
> of the state. But in the case when all queried stores are far behind, lag
> could
> be used to estimate the recovery velocity. You can then get a (pretty rough)
> idea of when a store might be ready, and wait until around then to query
> again.
> 
> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:
> 
>> I think I agree with John's recent reasoning as well: instead of letting
>> the storeMetadataAPI to return the staleness information, letting the
>> client to query either active or standby and letting standby query response
>> to include both the values + timestamp (or lag, as in diffs of timestamps)
>> would actually be more intuitive -- not only the streams client is simpler,
>> from user's perspective they also do not need to periodically refresh their
>> staleness information from the client, but only need to make decisions on
>> the fly whenever they need to query.
>>
>> Again the standby replica then need to know the current active task's
>> timestamp, which can be found from the log end record's encoded timestamp;
>> today we standby tasks do not read that specific record, but only refresh
>> its knowledge on the log end offset, but I think refreshing the latest
>> record timestamp is not a very bad request to add on the standby replicas.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
>> wrote:
>>
>>> +1 As someone implementing a query routing layer, there is already a need
>>> to have mechanisms in place to do 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Matthias J. Sax
Just to close the loop @Vinoth:

> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.

As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal to not
broadcast this information anyway, this concern is invalidated anyway

> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)

I did not see a real issue if people refresh the metadata frequently,
because it would be a local call. My main point was, that this would
change the current usage pattern of the API, and we would clearly need
to communicate this change. Similar to (1), this concern in invalidated
anyway.


@John: I think it's a great idea to get rid of reporting lag, and
pushing the decision making process about "what to query" into the query
serving layer itself. This simplifies the overall design of this KIP
significantly, and actually aligns very well with the idea that Kafka
Streams (as it is a library) should only provide the basic building
block. Many of my raised questions are invalided by this.



Some questions are still open though:

> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?


@Vinoth: about (5). I see your point about multiple calls vs a single
call. I still slightly prefer multiple calls, but it's highly subjective
and I would also be fine to add an #isActive() method. Would be good the
get feedback from others.


For (14), ie, lag in offsets vs time: Having both, as suggested by
Sophie would of course be best. What is a little unclear to me is, how
in details are we going to compute both?



-Matthias



On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> Just to chime in on the "report lag vs timestamp difference" issue, I would
> actually advocate for both. As mentioned already, time difference is
> probably a lot easier and/or more useful to reason about in terms of
> "freshness"
> of the state. But in the case when all queried stores are far behind, lag
> could
> be used to estimate the recovery velocity. You can then get a (pretty rough)
> idea of when a store might be ready, and wait until around then to query
> again.
> 
> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:
> 
>> I think I agree with John's recent reasoning as well: instead of letting
>> the storeMetadataAPI to return the staleness information, letting the
>> client to query either active or standby and letting standby query response
>> to include both the values + timestamp (or lag, as in diffs of timestamps)
>> would actually be more intuitive -- not only the streams client is simpler,
>> from user's perspective they also do not need to periodically refresh their
>> staleness information from the client, but only need to make decisions on
>> the fly whenever they need to query.
>>
>> Again the standby replica then need to know the current active task's
>> timestamp, which can be found from the log end record's encoded timestamp;
>> today we standby tasks do not read that specific record, but only refresh
>> its knowledge on the log end offset, but I think refreshing the latest
>> record timestamp is not a very bad request to add on the standby replicas.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
>> wrote:
>>
>>> +1 As someone implementing a query routing layer, there is already a need
>>> to have mechanisms in place to do healthchecks/failure detection to
>> detect
>>> failures for queries, while Streams rebalancing eventually kicks in the
>>> background.
>>> So, pushing this complexity to the IQ client app keeps Streams simpler as
>>> well. IQs will be potentially issues at an order of magnitude more
>>> frequently and it can achieve good freshness for the lag information.
>>>
>>> I would like to add however, that we would also need to introduce apis in
>>> KafkaStreams class, for obtaining lag information for all stores local to
>>> that host. This is for the IQs to relay back with the response/its own
>>> heartbeat mechanism.
>>>
>>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:
>>>
 Hi all,

 I've been mulling about this KIP, and I think I was on the wrong track
 earlier with regard to task lags. Tl;dr: I don't think we should add
 lags at all to the metadata API (and also not to the AssignmentInfo
 protocol message).

 Like I mentioned early on, reporting lag via
 SubscriptionInfo/AssignmentInfo would only work while rebalances are
 happening. Once the group stabilizes, no members would be notified of
 each others' lags anymore. I had been thinking that the solution would
 be the heartbeat proposal I mentioned earlier, but that proposal would
 have reported the 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Sophie Blee-Goldman
Just to chime in on the "report lag vs timestamp difference" issue, I would
actually advocate for both. As mentioned already, time difference is
probably a lot easier and/or more useful to reason about in terms of
"freshness"
of the state. But in the case when all queried stores are far behind, lag
could
be used to estimate the recovery velocity. You can then get a (pretty rough)
idea of when a store might be ready, and wait until around then to query
again.

On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:

> I think I agree with John's recent reasoning as well: instead of letting
> the storeMetadataAPI to return the staleness information, letting the
> client to query either active or standby and letting standby query response
> to include both the values + timestamp (or lag, as in diffs of timestamps)
> would actually be more intuitive -- not only the streams client is simpler,
> from user's perspective they also do not need to periodically refresh their
> staleness information from the client, but only need to make decisions on
> the fly whenever they need to query.
>
> Again the standby replica then need to know the current active task's
> timestamp, which can be found from the log end record's encoded timestamp;
> today we standby tasks do not read that specific record, but only refresh
> its knowledge on the log end offset, but I think refreshing the latest
> record timestamp is not a very bad request to add on the standby replicas.
>
>
> Guozhang
>
>
> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
> wrote:
>
> > +1 As someone implementing a query routing layer, there is already a need
> > to have mechanisms in place to do healthchecks/failure detection to
> detect
> > failures for queries, while Streams rebalancing eventually kicks in the
> > background.
> > So, pushing this complexity to the IQ client app keeps Streams simpler as
> > well. IQs will be potentially issues at an order of magnitude more
> > frequently and it can achieve good freshness for the lag information.
> >
> > I would like to add however, that we would also need to introduce apis in
> > KafkaStreams class, for obtaining lag information for all stores local to
> > that host. This is for the IQs to relay back with the response/its own
> > heartbeat mechanism.
> >
> > On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:
> >
> > > Hi all,
> > >
> > > I've been mulling about this KIP, and I think I was on the wrong track
> > > earlier with regard to task lags. Tl;dr: I don't think we should add
> > > lags at all to the metadata API (and also not to the AssignmentInfo
> > > protocol message).
> > >
> > > Like I mentioned early on, reporting lag via
> > > SubscriptionInfo/AssignmentInfo would only work while rebalances are
> > > happening. Once the group stabilizes, no members would be notified of
> > > each others' lags anymore. I had been thinking that the solution would
> > > be the heartbeat proposal I mentioned earlier, but that proposal would
> > > have reported the heartbeats of the members only to the leader member
> > > (the one who makes assignments). To be useful in the context of _this_
> > > KIP, we would also have to report the lags in the heartbeat responses
> > > to of _all_ members. This is a concern to be because now _all_ the
> > > lags get reported to _all_ the members on _every_ heartbeat... a lot
> > > of chatter.
> > >
> > > Plus, the proposal for KIP-441 is only to report the lags of each
> > > _task_. This is the sum of the lags of all the stores in the tasks.
> > > But this would be insufficient for KIP-535. For this kip, we would
> > > want the lag specifically of the store we want to query. So this
> > > means, we have to report the lags of all the stores of all the members
> > > to every member... even more chatter!
> > >
> > > The final nail in the coffin to me is that IQ clients would have to
> > > start refreshing their metadata quite frequently to stay up to date on
> > > the lags, which adds even more overhead to the system.
> > >
> > > Consider a strawman alternative: we bring KIP-535 back to extending
> > > the metadata API to tell the client the active and standby replicas
> > > for the key in question (not including and "staleness/lag"
> > > restriction, just returning all the replicas). Then, the client picks
> > > a replica and sends the query. The server returns the current lag
> > > along with the response (maybe in an HTML header or something). Then,
> > > the client keeps a map of its last observed lags for each replica, and
> > > uses this information to prefer fresher replicas.
> > >
> > > OR, if it wants only to query the active replica, it would throw an
> > > error on any lag response greater than zero, refreshes its metadata by
> > > re-querying the metadata API, and tries again with the current active
> > > replica.
> > >
> > > This way, the lag information will be super fresh for the client, and
> > > we keep the Metadata API / Assignment,Subscription / and Heartbeat as
> > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
 Sending the older replies again as they were unreadable due to bad formatting.

>> There was a follow-on idea I POCed to continuously share lag information in 
>> the heartbeat protocol
+1 that would be great, I will update the KIP assuming this work will finish 
soon




>> I think that adding a new method to StreamsMetadataState and deprecating the 
>> existing method is

the best way to go; we just can't change the return types of any existing 
methods.

+1 on this, we will add new methods for users who would be interested in 
querying back a list of possible options to query from and leave the current 
function getStreamsMetadataForKey() untouched for users who want absolute 
consistency.




>> why not just _always_ return all available metadata (including 
>> active/standby or lag) and let the caller decide to which node they want to 
>> route the query

+1. I think this makes sense as from a user standpoint there is no difference 
b/w an active and a standby if both have same lag, Infact users would be able 
to use this api to reduce query load on actives, so returning all available 
options along with the current lag in each would make sense and leave it to 
user how they want to use this data. This has another added advantage. If a 
user queries any random machine for a key and that machine has a replica for 
the partition(where key belongs) user might choose to serve the data from there 
itself(if it doesn’t lag much) rather than finding the active and making an IQ 
to that. This would save some critical time in serving for some applications. 




>> Adding the lag in terms of timestamp diff comparing the committed offset.

+1 on this, I think it’s more readable. But as John said the function 
allMetadataForKey() is just returning the possible options from where users can 
query a key, so we can even drop the parameter 
enableReplicaServing/tolerableDataStaleness and just return all the 
streamsMetadata containing that key along with the offset limit.



   
   - @John has already commented on this one.
   - Yeah the usage pattern would include querying this prior to every request 
   - Will add the changes to StreamsMetadata in the KIP, would include changes 
in rebuildMetadata() etc.
   - Makes sense, already addressed above
   - Is it important from a user perspective if they are querying an  
active(processing), active(restoring), a standby task if we have away of 
denoting lag in a readable manner which kind of signifies the user that this is 
the best node to query the fresh data.
   - Yes, I intend to return the actives and replicas in the same return list 
in allMetadataForKey()
   - I think we can’t be dependent on propagating lag just during rebalances, 
sending it with each hearbeat every 3 secs seems like a better idea and having 
this less delay also saves us from this race condition.
   - yes, we need new functions to return activeRestoring and standbyRunning 
tasks.
   - StreamsConfig doesn’t look like of much use to me since we are giving all 
possible options via this function, or they can use existing function 
getStreamsMetadataForKey() and get just the active
   - I think treat them both the same and let the lag do the talking
   - We are just sending them the option to query from in allMetadataForKey(), 
which doesn’t include any handle. We then query that machine for the key where 
it calls allStores() and tries to find the task in 
activeRunning/activeRestoring/standbyRunning and adds the store handle here. 
   - Need to verify, but during the exact point when store is closed to 
transition it from restoring to running the queries will fail. The caller in 
such case can have their own configurable retries to check again or try the 
replica if a call fails to active
   - I think KIP-216 is working on those lines, we might not need few of those 
exceptions since now the basic idea of this KIP is to support IQ during 
rebalancing.
   - Addressed above, agreed it looks more readable.

 

On Friday, 25 October, 2019, 10:23:57 am IST, Guozhang Wang 
 wrote:  
 
 I think I agree with John's recent reasoning as well: instead of letting
the storeMetadataAPI to return the staleness information, letting the
client to query either active or standby and letting standby query response
to include both the values + timestamp (or lag, as in diffs of timestamps)
would actually be more intuitive -- not only the streams client is simpler,
from user's perspective they also do not need to periodically refresh their
staleness information from the client, but only need to make decisions on
the fly whenever they need to query.

Again the standby replica then need to know the current active task's
timestamp, which can be found from the log end record's encoded timestamp;
today we standby tasks do not read that specific record, but only refresh
its knowledge on the log end offset, but I think refreshing the latest
record timestamp is not a very bad request to add on the standby replicas.



Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread Guozhang Wang
I think I agree with John's recent reasoning as well: instead of letting
the storeMetadataAPI to return the staleness information, letting the
client to query either active or standby and letting standby query response
to include both the values + timestamp (or lag, as in diffs of timestamps)
would actually be more intuitive -- not only the streams client is simpler,
from user's perspective they also do not need to periodically refresh their
staleness information from the client, but only need to make decisions on
the fly whenever they need to query.

Again the standby replica then need to know the current active task's
timestamp, which can be found from the log end record's encoded timestamp;
today we standby tasks do not read that specific record, but only refresh
its knowledge on the log end offset, but I think refreshing the latest
record timestamp is not a very bad request to add on the standby replicas.


Guozhang


On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
wrote:

> +1 As someone implementing a query routing layer, there is already a need
> to have mechanisms in place to do healthchecks/failure detection to detect
> failures for queries, while Streams rebalancing eventually kicks in the
> background.
> So, pushing this complexity to the IQ client app keeps Streams simpler as
> well. IQs will be potentially issues at an order of magnitude more
> frequently and it can achieve good freshness for the lag information.
>
> I would like to add however, that we would also need to introduce apis in
> KafkaStreams class, for obtaining lag information for all stores local to
> that host. This is for the IQs to relay back with the response/its own
> heartbeat mechanism.
>
> On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:
>
> > Hi all,
> >
> > I've been mulling about this KIP, and I think I was on the wrong track
> > earlier with regard to task lags. Tl;dr: I don't think we should add
> > lags at all to the metadata API (and also not to the AssignmentInfo
> > protocol message).
> >
> > Like I mentioned early on, reporting lag via
> > SubscriptionInfo/AssignmentInfo would only work while rebalances are
> > happening. Once the group stabilizes, no members would be notified of
> > each others' lags anymore. I had been thinking that the solution would
> > be the heartbeat proposal I mentioned earlier, but that proposal would
> > have reported the heartbeats of the members only to the leader member
> > (the one who makes assignments). To be useful in the context of _this_
> > KIP, we would also have to report the lags in the heartbeat responses
> > to of _all_ members. This is a concern to be because now _all_ the
> > lags get reported to _all_ the members on _every_ heartbeat... a lot
> > of chatter.
> >
> > Plus, the proposal for KIP-441 is only to report the lags of each
> > _task_. This is the sum of the lags of all the stores in the tasks.
> > But this would be insufficient for KIP-535. For this kip, we would
> > want the lag specifically of the store we want to query. So this
> > means, we have to report the lags of all the stores of all the members
> > to every member... even more chatter!
> >
> > The final nail in the coffin to me is that IQ clients would have to
> > start refreshing their metadata quite frequently to stay up to date on
> > the lags, which adds even more overhead to the system.
> >
> > Consider a strawman alternative: we bring KIP-535 back to extending
> > the metadata API to tell the client the active and standby replicas
> > for the key in question (not including and "staleness/lag"
> > restriction, just returning all the replicas). Then, the client picks
> > a replica and sends the query. The server returns the current lag
> > along with the response (maybe in an HTML header or something). Then,
> > the client keeps a map of its last observed lags for each replica, and
> > uses this information to prefer fresher replicas.
> >
> > OR, if it wants only to query the active replica, it would throw an
> > error on any lag response greater than zero, refreshes its metadata by
> > re-querying the metadata API, and tries again with the current active
> > replica.
> >
> > This way, the lag information will be super fresh for the client, and
> > we keep the Metadata API / Assignment,Subscription / and Heartbeat as
> > slim as possible.
> >
> > Side note: I do think that some time soon, we'll have to add a library
> > for IQ server/clients. I think that this logic will start to get
> > pretty complex.
> >
> > I hope this thinking is reasonably clear!
> > Thanks again,
> > -John
> >
> > Does that
> >
> > On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar 
> > wrote:
> > >
> > > Responding to the points raised by Matthias
> > >
> > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > information
> > > to AssignmentInfo, which gets sent to every participant.
> > >
> > > 2. At-least I was under the assumption that it can be called per query,
> > > since the API docs don't seem 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread Vinoth Chandar
+1 As someone implementing a query routing layer, there is already a need
to have mechanisms in place to do healthchecks/failure detection to detect
failures for queries, while Streams rebalancing eventually kicks in the
background.
So, pushing this complexity to the IQ client app keeps Streams simpler as
well. IQs will be potentially issues at an order of magnitude more
frequently and it can achieve good freshness for the lag information.

I would like to add however, that we would also need to introduce apis in
KafkaStreams class, for obtaining lag information for all stores local to
that host. This is for the IQs to relay back with the response/its own
heartbeat mechanism.

On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:

> Hi all,
>
> I've been mulling about this KIP, and I think I was on the wrong track
> earlier with regard to task lags. Tl;dr: I don't think we should add
> lags at all to the metadata API (and also not to the AssignmentInfo
> protocol message).
>
> Like I mentioned early on, reporting lag via
> SubscriptionInfo/AssignmentInfo would only work while rebalances are
> happening. Once the group stabilizes, no members would be notified of
> each others' lags anymore. I had been thinking that the solution would
> be the heartbeat proposal I mentioned earlier, but that proposal would
> have reported the heartbeats of the members only to the leader member
> (the one who makes assignments). To be useful in the context of _this_
> KIP, we would also have to report the lags in the heartbeat responses
> to of _all_ members. This is a concern to be because now _all_ the
> lags get reported to _all_ the members on _every_ heartbeat... a lot
> of chatter.
>
> Plus, the proposal for KIP-441 is only to report the lags of each
> _task_. This is the sum of the lags of all the stores in the tasks.
> But this would be insufficient for KIP-535. For this kip, we would
> want the lag specifically of the store we want to query. So this
> means, we have to report the lags of all the stores of all the members
> to every member... even more chatter!
>
> The final nail in the coffin to me is that IQ clients would have to
> start refreshing their metadata quite frequently to stay up to date on
> the lags, which adds even more overhead to the system.
>
> Consider a strawman alternative: we bring KIP-535 back to extending
> the metadata API to tell the client the active and standby replicas
> for the key in question (not including and "staleness/lag"
> restriction, just returning all the replicas). Then, the client picks
> a replica and sends the query. The server returns the current lag
> along with the response (maybe in an HTML header or something). Then,
> the client keeps a map of its last observed lags for each replica, and
> uses this information to prefer fresher replicas.
>
> OR, if it wants only to query the active replica, it would throw an
> error on any lag response greater than zero, refreshes its metadata by
> re-querying the metadata API, and tries again with the current active
> replica.
>
> This way, the lag information will be super fresh for the client, and
> we keep the Metadata API / Assignment,Subscription / and Heartbeat as
> slim as possible.
>
> Side note: I do think that some time soon, we'll have to add a library
> for IQ server/clients. I think that this logic will start to get
> pretty complex.
>
> I hope this thinking is reasonably clear!
> Thanks again,
> -John
>
> Does that
>
> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar 
> wrote:
> >
> > Responding to the points raised by Matthias
> >
> > 1. IIUC John intends to add (or we can do this in this KIP) lag
> information
> > to AssignmentInfo, which gets sent to every participant.
> >
> > 2. At-least I was under the assumption that it can be called per query,
> > since the API docs don't seem to suggest otherwise. Do you see any
> > potential issues if we call this every query? (we should benchmark this
> > nonetheless)
> >
> > 4. Agree. metadataForKey() implicitly would return the active host
> metadata
> > (as it was before). We should also document this in that APIs javadoc,
> > given we have another method(s) that returns more host metadata now.
> >
> > 5.  While I see the point, the app/caller has to make two different APIs
> > calls to obtain active/standby and potentially do the same set of
> operation
> > to query the state. I personally still like a method like isActive()
> > better, but don't have strong opinions.
> >
> > 9. If we do expose the lag information, could we just leave it upto to
> the
> > caller to decide whether it errors out or not and not make the decision
> > within Streams? i.e we don't need a new config
> >
> > 14. +1 . If it's easier to do right away. We started with number of
> > records, following the lead from KIP-441
> >
> > On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> >  wrote:
> >
> > >
> > > Thanks, everyone for taking a look. Some very cool ideas have flown in.
> > >
> > > >> There was a 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread John Roesler
Hi all,

I've been mulling about this KIP, and I think I was on the wrong track
earlier with regard to task lags. Tl;dr: I don't think we should add
lags at all to the metadata API (and also not to the AssignmentInfo
protocol message).

Like I mentioned early on, reporting lag via
SubscriptionInfo/AssignmentInfo would only work while rebalances are
happening. Once the group stabilizes, no members would be notified of
each others' lags anymore. I had been thinking that the solution would
be the heartbeat proposal I mentioned earlier, but that proposal would
have reported the heartbeats of the members only to the leader member
(the one who makes assignments). To be useful in the context of _this_
KIP, we would also have to report the lags in the heartbeat responses
to of _all_ members. This is a concern to be because now _all_ the
lags get reported to _all_ the members on _every_ heartbeat... a lot
of chatter.

Plus, the proposal for KIP-441 is only to report the lags of each
_task_. This is the sum of the lags of all the stores in the tasks.
But this would be insufficient for KIP-535. For this kip, we would
want the lag specifically of the store we want to query. So this
means, we have to report the lags of all the stores of all the members
to every member... even more chatter!

The final nail in the coffin to me is that IQ clients would have to
start refreshing their metadata quite frequently to stay up to date on
the lags, which adds even more overhead to the system.

Consider a strawman alternative: we bring KIP-535 back to extending
the metadata API to tell the client the active and standby replicas
for the key in question (not including and "staleness/lag"
restriction, just returning all the replicas). Then, the client picks
a replica and sends the query. The server returns the current lag
along with the response (maybe in an HTML header or something). Then,
the client keeps a map of its last observed lags for each replica, and
uses this information to prefer fresher replicas.

OR, if it wants only to query the active replica, it would throw an
error on any lag response greater than zero, refreshes its metadata by
re-querying the metadata API, and tries again with the current active
replica.

This way, the lag information will be super fresh for the client, and
we keep the Metadata API / Assignment,Subscription / and Heartbeat as
slim as possible.

Side note: I do think that some time soon, we'll have to add a library
for IQ server/clients. I think that this logic will start to get
pretty complex.

I hope this thinking is reasonably clear!
Thanks again,
-John

Does that

On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar  wrote:
>
> Responding to the points raised by Matthias
>
> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.
>
> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)
>
> 4. Agree. metadataForKey() implicitly would return the active host metadata
> (as it was before). We should also document this in that APIs javadoc,
> given we have another method(s) that returns more host metadata now.
>
> 5.  While I see the point, the app/caller has to make two different APIs
> calls to obtain active/standby and potentially do the same set of operation
> to query the state. I personally still like a method like isActive()
> better, but don't have strong opinions.
>
> 9. If we do expose the lag information, could we just leave it upto to the
> caller to decide whether it errors out or not and not make the decision
> within Streams? i.e we don't need a new config
>
> 14. +1 . If it's easier to do right away. We started with number of
> records, following the lead from KIP-441
>
> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
>  wrote:
>
> >
> > Thanks, everyone for taking a look. Some very cool ideas have flown in.
> >
> > >> There was a follow-on idea I POCed to continuously share lag
> > information in the heartbeat protocol+1 that would be great, I will update
> > the KIP assuming this work will finish soon
> > >> I think that adding a new method to StreamsMetadataState and
> > deprecating the existing method isthe best way to go; we just can't change
> > the return types of any existing methods.+1 on this, we will add new
> > methods for users who would be interested in querying back a list of
> > possible options to query from and leave the current function
> > getStreamsMetadataForKey() untouched for users who want absolute
> > consistency.
> > >> why not just always return all available metadata (including
> > active/standby or lag) and let the caller decide to which node they want to
> > route the query+1. I think this makes sense as from a user standpoint there
> > is no difference b/w an active and a standby if both have same lag, 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-23 Thread Vinoth Chandar
Responding to the points raised by Matthias

1. IIUC John intends to add (or we can do this in this KIP) lag information
to AssignmentInfo, which gets sent to every participant.

2. At-least I was under the assumption that it can be called per query,
since the API docs don't seem to suggest otherwise. Do you see any
potential issues if we call this every query? (we should benchmark this
nonetheless)

4. Agree. metadataForKey() implicitly would return the active host metadata
(as it was before). We should also document this in that APIs javadoc,
given we have another method(s) that returns more host metadata now.

5.  While I see the point, the app/caller has to make two different APIs
calls to obtain active/standby and potentially do the same set of operation
to query the state. I personally still like a method like isActive()
better, but don't have strong opinions.

9. If we do expose the lag information, could we just leave it upto to the
caller to decide whether it errors out or not and not make the decision
within Streams? i.e we don't need a new config

14. +1 . If it's easier to do right away. We started with number of
records, following the lead from KIP-441

On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
 wrote:

>
> Thanks, everyone for taking a look. Some very cool ideas have flown in.
>
> >> There was a follow-on idea I POCed to continuously share lag
> information in the heartbeat protocol+1 that would be great, I will update
> the KIP assuming this work will finish soon
> >> I think that adding a new method to StreamsMetadataState and
> deprecating the existing method isthe best way to go; we just can't change
> the return types of any existing methods.+1 on this, we will add new
> methods for users who would be interested in querying back a list of
> possible options to query from and leave the current function
> getStreamsMetadataForKey() untouched for users who want absolute
> consistency.
> >> why not just always return all available metadata (including
> active/standby or lag) and let the caller decide to which node they want to
> route the query+1. I think this makes sense as from a user standpoint there
> is no difference b/w an active and a standby if both have same lag, Infact
> users would be able to use this API to reduce query load on actives, so
> returning all available options along with the current lag in each would
> make sense and leave it to user how they want to use this data. This has
> another added advantage. If a user queries any random machine for a key and
> that machine has a replica for the partition(where key belongs) user might
> choose to serve the data from there itself(if it doesn’t lag much) rather
> than finding the active and making an IQ to that. This would save some
> critical time in serving for some applications.
> >> Adding the lag in terms of timestamp diff comparing the committed
> offset.+1 on this, I think it’s more readable. But as John said the
> function allMetadataForKey() is just returning the possible options from
> where users can query a key, so we can even drop the parameter
> enableReplicaServing/tolerableDataStaleness and just return all the
> streamsMetadata containing that key along with the offset limit.
> Answering the questions posted by Matthias in sequence.
> 1. @John can you please comment on this one.2. Yeah the usage pattern
> would include querying this prior to every request 3. Will add the changes
> to StreamsMetadata in the KIP, would include changes in rebuildMetadata()
> etc.4. Makes sense, already addressed above5. Is it important from a user
> perspective if they are querying an  active(processing), active(restoring),
> a standby task if we have away of denoting lag in a readable manner which
> kind of signifies the user that this is the best node to query the fresh
> data.6. Yes, I intend to return the actives and replicas in the same return
> list in allMetadataForKey()7. tricky8. yes, we need new functions to return
> activeRestoring and standbyRunning tasks.9. StreamsConfig doesn’t look like
> of much use to me since we are giving all possible options via this
> function, or they can use existing function getStreamsMetadataForKey() and
> get just the active10. I think treat them both the same and let the lag do
> the talking11. We are just sending them the option to query from in
> allMetadataForKey(), which doesn’t include any handle. We then query that
> machine for the key where it calls allStores() and tries to find the task
> in activeRunning/activeRestoring/standbyRunning and adds the store handle
> here. 12. Need to verify, but during the exact point when store is closed
> to transition it from restoring to running the queries will fail. The
> caller in such case can have their own configurable retries to check again
> or try the replica if a call fails to active13. I think KIP-216 is working
> on those lines, we might not need few of those exceptions since now the
> basic idea of this KIP is to support IQ 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-23 Thread Navinder Brar
 
Thanks, everyone for taking a look. Some very cool ideas have flown in. 

>> There was a follow-on idea I POCed to continuously share lag information in 
>> the heartbeat protocol+1 that would be great, I will update the KIP assuming 
>> this work will finish soon
>> I think that adding a new method to StreamsMetadataState and deprecating the 
>> existing method isthe best way to go; we just can't change the return types 
>> of any existing methods.+1 on this, we will add new methods for users who 
>> would be interested in querying back a list of possible options to query 
>> from and leave the current function getStreamsMetadataForKey() untouched for 
>> users who want absolute consistency.
>> why not just always return all available metadata (including active/standby 
>> or lag) and let the caller decide to which node they want to route the 
>> query+1. I think this makes sense as from a user standpoint there is no 
>> difference b/w an active and a standby if both have same lag, Infact users 
>> would be able to use this API to reduce query load on actives, so returning 
>> all available options along with the current lag in each would make sense 
>> and leave it to user how they want to use this data. This has another added 
>> advantage. If a user queries any random machine for a key and that machine 
>> has a replica for the partition(where key belongs) user might choose to 
>> serve the data from there itself(if it doesn’t lag much) rather than finding 
>> the active and making an IQ to that. This would save some critical time in 
>> serving for some applications. 
>> Adding the lag in terms of timestamp diff comparing the committed offset.+1 
>> on this, I think it’s more readable. But as John said the function 
>> allMetadataForKey() is just returning the possible options from where users 
>> can query a key, so we can even drop the parameter 
>> enableReplicaServing/tolerableDataStaleness and just return all the 
>> streamsMetadata containing that key along with the offset limit.
Answering the questions posted by Matthias in sequence.
1. @John can you please comment on this one.2. Yeah the usage pattern would 
include querying this prior to every request 3. Will add the changes to 
StreamsMetadata in the KIP, would include changes in rebuildMetadata() etc.4. 
Makes sense, already addressed above5. Is it important from a user perspective 
if they are querying an  active(processing), active(restoring), a standby task 
if we have away of denoting lag in a readable manner which kind of signifies 
the user that this is the best node to query the fresh data.6. Yes, I intend to 
return the actives and replicas in the same return list in 
allMetadataForKey()7. tricky8. yes, we need new functions to return 
activeRestoring and standbyRunning tasks.9. StreamsConfig doesn’t look like of 
much use to me since we are giving all possible options via this function, or 
they can use existing function getStreamsMetadataForKey() and get just the 
active10. I think treat them both the same and let the lag do the talking11. We 
are just sending them the option to query from in allMetadataForKey(), which 
doesn’t include any handle. We then query that machine for the key where it 
calls allStores() and tries to find the task in 
activeRunning/activeRestoring/standbyRunning and adds the store handle here. 
12. Need to verify, but during the exact point when store is closed to 
transition it from restoring to running the queries will fail. The caller in 
such case can have their own configurable retries to check again or try the 
replica if a call fails to active13. I think KIP-216 is working on those lines, 
we might not need few of those exceptions since now the basic idea of this KIP 
is to support IQ during rebalancing.14. Addressed above, agreed it looks more 
readable.
 

On Tuesday, 22 October, 2019, 08:39:07 pm IST, Matthias J. Sax 
 wrote:  
 
 One more thought:

14) Is specifying the allowed lag in number of records a useful way for
users to declare how stale an instance is allowed to be? Would it be
more intuitive for users to specify the allowed lag in time units (would
event time or processing time be better)? It seems hard for users to
reason how "fresh" a store really is when number of records is used.


-Matthias

On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> Some more follow up thoughts:
> 
> 11) If we get a store handle of an active(restoring) task, and the task
> transits to running, does the store handle become invalid and a new one
> must be retrieved? Or can we "switch it out" underneath -- for this
> case, how does the user know when they start to query the up-to-date state?
> 
> 12) Standby tasks will have the store open in regular mode, while
> active(restoring) tasks open stores in "upgrade mode" for more efficient
> bulk loading. When we switch the store into active mode, we close it and
> reopen it. What is the impact if we query the store during restore? What
> is the impact if we 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-22 Thread Matthias J. Sax
One more thought:

14) Is specifying the allowed lag in number of records a useful way for
users to declare how stale an instance is allowed to be? Would it be
more intuitive for users to specify the allowed lag in time units (would
event time or processing time be better)? It seems hard for users to
reason how "fresh" a store really is when number of records is used.


-Matthias

On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> Some more follow up thoughts:
> 
> 11) If we get a store handle of an active(restoring) task, and the task
> transits to running, does the store handle become invalid and a new one
> must be retrieved? Or can we "switch it out" underneath -- for this
> case, how does the user know when they start to query the up-to-date state?
> 
> 12) Standby tasks will have the store open in regular mode, while
> active(restoring) tasks open stores in "upgrade mode" for more efficient
> bulk loading. When we switch the store into active mode, we close it and
> reopen it. What is the impact if we query the store during restore? What
> is the impact if we close the store to transit to running (eg, there
> might be open iterators)?
> 
> 13) Do we need to introduced new exception types? Compare KIP-216
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors)
> that aims to improve the user experience with regard to IQ exceptions.
> 
> 
> -Matthias
> 
> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
>> Thanks for the KIP.
>>
>> Couple of comments:
>>
>> 1) With regard to KIP-441, my current understanding is that the lag
>> information is only reported to the leader (please correct me if I am
>> wrong). This seems to be quite a limitation to actually use the lag
>> information.
>>
>> 2) The idea of the metadata API is actually to get metadata once and
>> only refresh the metadata if a store was migrated. The current proposal
>> would require to get the metadata before each query. The KIP should
>> describe the usage pattern and impact in more detail.
>>
>> 3) Currently, the KIP does not list the public API changes in detail.
>> Please list all methods you intend to deprecate and list all methods you
>> intend to add (best, using a code-block markup -- compare
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
>> as an example)
>>
>> 4) Also note (as already pointed out by John), that we cannot have any
>> breaking API changes. Thus, the API should be designed in a fully
>> backward compatible manner.
>>
>> 5) Returning a list of metadata object makes it hard for user to know if
>> the first object refers to the active(processing), active(restoring), or
>> a standby task. IMHO, we should be more explicit. For example, a
>> metadata object could have a flag that one can test via `#isActive()`.
>> Or maybe even better, we could keep the current API as-is and add
>> something like `standbyMetadataForKey()` (and similar methods for
>> other). Having just a flag `isActive()` is a little subtle and having
>> new overloads would make the API much clearer (passing in a boolean flag
>> does not seem to be a nice API).
>>
>> 6) Do you intent to return all standby metadata information at once,
>> similar to `allMetadata()` -- seems to be useful.
>>
>> 7) Even if the lag information is propagated to all instances, it will
>> happen in an async manner. Hence, I am wondering if we should address
>> this race condition (I think we should). The idea would be to check if a
>> standby/active(restoring) task is actually still within the lag bounds
>> when a query is executed and we would throw an exception if not.
>>
>> 8) The current `KafkaStreams#state()` method only returns a handle to
>> stores of active(processing) tasks. How can a user actually get a handle
>> to an store of an active(restoring) or standby task for querying? Seems
>> we should add a new method to get standby handles? Changing the
>> semantics to existing `state()` would be possible, but I think adding a
>> new method is preferable?
>>
>> 9) How does the user actually specify the acceptable lag? A global
>> config via StreamsConfig (this would be a public API change that needs
>> to be covered in the KIP)? Or on a per-store or even per-query basis for
>> more flexibility? We could also have a global setting that is used as
>> default and allow to overwrite it on a per-query basis.
>>
>> 10) Do we need to distinguish between active(restoring) and standby
>> tasks? Or could be treat both as the same?
>>
>>
>>
>>
>> -Matthias
>>
>>
>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> I'm wondering, rather than putting "acceptable lag" into the
>>> configuration at all, or even making it a parameter on `allMetadataForKey`,
>>> why not just _always_ return all available metadata (including
>>> active/standby or lag) and let the caller decide to which node they want to
>>> route the query?
>>> +1 on exposing lag 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Matthias J. Sax
Some more follow up thoughts:

11) If we get a store handle of an active(restoring) task, and the task
transits to running, does the store handle become invalid and a new one
must be retrieved? Or can we "switch it out" underneath -- for this
case, how does the user know when they start to query the up-to-date state?

12) Standby tasks will have the store open in regular mode, while
active(restoring) tasks open stores in "upgrade mode" for more efficient
bulk loading. When we switch the store into active mode, we close it and
reopen it. What is the impact if we query the store during restore? What
is the impact if we close the store to transit to running (eg, there
might be open iterators)?

13) Do we need to introduced new exception types? Compare KIP-216
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors)
that aims to improve the user experience with regard to IQ exceptions.


-Matthias

On 10/21/19 6:39 PM, Matthias J. Sax wrote:
> Thanks for the KIP.
> 
> Couple of comments:
> 
> 1) With regard to KIP-441, my current understanding is that the lag
> information is only reported to the leader (please correct me if I am
> wrong). This seems to be quite a limitation to actually use the lag
> information.
> 
> 2) The idea of the metadata API is actually to get metadata once and
> only refresh the metadata if a store was migrated. The current proposal
> would require to get the metadata before each query. The KIP should
> describe the usage pattern and impact in more detail.
> 
> 3) Currently, the KIP does not list the public API changes in detail.
> Please list all methods you intend to deprecate and list all methods you
> intend to add (best, using a code-block markup -- compare
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> as an example)
> 
> 4) Also note (as already pointed out by John), that we cannot have any
> breaking API changes. Thus, the API should be designed in a fully
> backward compatible manner.
> 
> 5) Returning a list of metadata object makes it hard for user to know if
> the first object refers to the active(processing), active(restoring), or
> a standby task. IMHO, we should be more explicit. For example, a
> metadata object could have a flag that one can test via `#isActive()`.
> Or maybe even better, we could keep the current API as-is and add
> something like `standbyMetadataForKey()` (and similar methods for
> other). Having just a flag `isActive()` is a little subtle and having
> new overloads would make the API much clearer (passing in a boolean flag
> does not seem to be a nice API).
> 
> 6) Do you intent to return all standby metadata information at once,
> similar to `allMetadata()` -- seems to be useful.
> 
> 7) Even if the lag information is propagated to all instances, it will
> happen in an async manner. Hence, I am wondering if we should address
> this race condition (I think we should). The idea would be to check if a
> standby/active(restoring) task is actually still within the lag bounds
> when a query is executed and we would throw an exception if not.
> 
> 8) The current `KafkaStreams#state()` method only returns a handle to
> stores of active(processing) tasks. How can a user actually get a handle
> to an store of an active(restoring) or standby task for querying? Seems
> we should add a new method to get standby handles? Changing the
> semantics to existing `state()` would be possible, but I think adding a
> new method is preferable?
> 
> 9) How does the user actually specify the acceptable lag? A global
> config via StreamsConfig (this would be a public API change that needs
> to be covered in the KIP)? Or on a per-store or even per-query basis for
> more flexibility? We could also have a global setting that is used as
> default and allow to overwrite it on a per-query basis.
> 
> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?
> 
> 
> 
> 
> -Matthias
> 
> 
> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
 I'm wondering, rather than putting "acceptable lag" into the
>> configuration at all, or even making it a parameter on `allMetadataForKey`,
>> why not just _always_ return all available metadata (including
>> active/standby or lag) and let the caller decide to which node they want to
>> route the query?
>> +1 on exposing lag information via the APIs. IMO without having
>> continuously updated/fresh lag information, its true value as a signal for
>> query routing decisions is much limited. But we can design the API around
>> this model and iterate? Longer term, we should have continuously shared lag
>> information.
>>
 more general to refactor it to "allMetadataForKey(long
>> tolerableDataStaleness, ...)", and when it's set to 0 it means "active task
>> only".
>> +1 IMO if we plan on having `enableReplicaServing`, it makes sense to
>> generalize 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Matthias J. Sax
Thanks for the KIP.

Couple of comments:

1) With regard to KIP-441, my current understanding is that the lag
information is only reported to the leader (please correct me if I am
wrong). This seems to be quite a limitation to actually use the lag
information.

2) The idea of the metadata API is actually to get metadata once and
only refresh the metadata if a store was migrated. The current proposal
would require to get the metadata before each query. The KIP should
describe the usage pattern and impact in more detail.

3) Currently, the KIP does not list the public API changes in detail.
Please list all methods you intend to deprecate and list all methods you
intend to add (best, using a code-block markup -- compare
https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
as an example)

4) Also note (as already pointed out by John), that we cannot have any
breaking API changes. Thus, the API should be designed in a fully
backward compatible manner.

5) Returning a list of metadata object makes it hard for user to know if
the first object refers to the active(processing), active(restoring), or
a standby task. IMHO, we should be more explicit. For example, a
metadata object could have a flag that one can test via `#isActive()`.
Or maybe even better, we could keep the current API as-is and add
something like `standbyMetadataForKey()` (and similar methods for
other). Having just a flag `isActive()` is a little subtle and having
new overloads would make the API much clearer (passing in a boolean flag
does not seem to be a nice API).

6) Do you intent to return all standby metadata information at once,
similar to `allMetadata()` -- seems to be useful.

7) Even if the lag information is propagated to all instances, it will
happen in an async manner. Hence, I am wondering if we should address
this race condition (I think we should). The idea would be to check if a
standby/active(restoring) task is actually still within the lag bounds
when a query is executed and we would throw an exception if not.

8) The current `KafkaStreams#state()` method only returns a handle to
stores of active(processing) tasks. How can a user actually get a handle
to an store of an active(restoring) or standby task for querying? Seems
we should add a new method to get standby handles? Changing the
semantics to existing `state()` would be possible, but I think adding a
new method is preferable?

9) How does the user actually specify the acceptable lag? A global
config via StreamsConfig (this would be a public API change that needs
to be covered in the KIP)? Or on a per-store or even per-query basis for
more flexibility? We could also have a global setting that is used as
default and allow to overwrite it on a per-query basis.

10) Do we need to distinguish between active(restoring) and standby
tasks? Or could be treat both as the same?




-Matthias


On 10/21/19 5:40 PM, Vinoth Chandar wrote:
>>> I'm wondering, rather than putting "acceptable lag" into the
> configuration at all, or even making it a parameter on `allMetadataForKey`,
> why not just _always_ return all available metadata (including
> active/standby or lag) and let the caller decide to which node they want to
> route the query?
> +1 on exposing lag information via the APIs. IMO without having
> continuously updated/fresh lag information, its true value as a signal for
> query routing decisions is much limited. But we can design the API around
> this model and iterate? Longer term, we should have continuously shared lag
> information.
> 
>>> more general to refactor it to "allMetadataForKey(long
> tolerableDataStaleness, ...)", and when it's set to 0 it means "active task
> only".
> +1 IMO if we plan on having `enableReplicaServing`, it makes sense to
> generalize based on dataStaleness. This seems complementary to exposing the
> lag information itself.
> 
>>> This is actually not a public api change at all, and I'm planning to
> implement it asap as a precursor to the rest of KIP-441
> +1 again. Do we have a concrete timeline for when this change will land on
> master? I would like to get the implementation wrapped up (as much as
> possible) by end of the month. :). But I agree this sequencing makes
> sense..
> 
> 
> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang  wrote:
> 
>> Hi Navinder,
>>
>> Thanks for the KIP, I have a high level question about the proposed API
>> regarding:
>>
>> "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)"
>>
>> I'm wondering if it's more general to refactor it to
>> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to
>> 0 it means "active task only". Behind the scene, we can have the committed
>> offsets to encode the stream time as well, so that when processing standby
>> tasks the stream process knows not long the lag in terms of offsets
>> comparing to the committed offset (internally we call it offset limit), but
>> also the 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Vinoth Chandar
>>I'm wondering, rather than putting "acceptable lag" into the
configuration at all, or even making it a parameter on `allMetadataForKey`,
why not just _always_ return all available metadata (including
active/standby or lag) and let the caller decide to which node they want to
route the query?
+1 on exposing lag information via the APIs. IMO without having
continuously updated/fresh lag information, its true value as a signal for
query routing decisions is much limited. But we can design the API around
this model and iterate? Longer term, we should have continuously shared lag
information.

>>more general to refactor it to "allMetadataForKey(long
tolerableDataStaleness, ...)", and when it's set to 0 it means "active task
only".
+1 IMO if we plan on having `enableReplicaServing`, it makes sense to
generalize based on dataStaleness. This seems complementary to exposing the
lag information itself.

>>This is actually not a public api change at all, and I'm planning to
implement it asap as a precursor to the rest of KIP-441
+1 again. Do we have a concrete timeline for when this change will land on
master? I would like to get the implementation wrapped up (as much as
possible) by end of the month. :). But I agree this sequencing makes
sense..


On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang  wrote:

> Hi Navinder,
>
> Thanks for the KIP, I have a high level question about the proposed API
> regarding:
>
> "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)"
>
> I'm wondering if it's more general to refactor it to
> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to
> 0 it means "active task only". Behind the scene, we can have the committed
> offsets to encode the stream time as well, so that when processing standby
> tasks the stream process knows not long the lag in terms of offsets
> comparing to the committed offset (internally we call it offset limit), but
> also the lag in terms of timestamp diff comparing the committed offset.
>
> Also encoding the timestamp as part of offset have other benefits for
> improving Kafka Streams time semantics as well, but for KIP-535 itself I
> think it can help giving users a more intuitive interface to reason about.
>
>
> Guozhang
>
> On Mon, Oct 21, 2019 at 12:30 PM John Roesler  wrote:
>
> > Hey Navinder,
> >
> > Thanks for the KIP! I've been reading over the discussion thus far,
> > and I have a couple of thoughts to pile on as well:
> >
> > It seems confusing to propose the API in terms of the current system
> > state, but also propose how the API would look if/when KIP-441 is
> > implemented. It occurs to me that the only part of KIP-441 that would
> > affect you is the availability of the lag information in the
> > SubscriptionInfo message. This is actually not a public api change at
> > all, and I'm planning to implement it asap as a precursor to the rest
> > of KIP-441, so maybe you can just build on top of KIP-441 and assume
> > the lag information will be available. Then you could have a more
> > straightforward proposal (e.g., mention that you'd return the lag
> > information in AssignmentInfo as well as in the StreamsMetadata in
> > some form, or make use of it in the API somehow).
> >
> > I'm partially motivated in that former point because it seems like
> > understanding how callers would bound the staleness for their use case
> > is _the_ key point for this KIP. FWIW, I think that adding a new
> > method to StreamsMetadataState and deprecating the existing method is
> > the best way to go; we just can't change the return types of any
> > existing methods.
> >
> > I'm wondering, rather than putting "acceptable lag" into the
> > configuration at all, or even making it a parameter on
> > `allMetadataForKey`, why not just _always_ return all available
> > metadata (including active/standby or lag) and let the caller decide
> > to which node they want to route the query? This method isn't making
> > any queries itself; it's merely telling you where the local Streams
> > instance _thinks_ the key in question is located. Just returning all
> > available information lets the caller implement any semantics they
> > desire around querying only active stores, or standbys, or recovering
> > stores, or whatever.
> >
> > One fly in the ointment, which you may wish to consider if proposing
> > to use lag information, is that the cluster would only become aware of
> > new lag information during rebalances. Even in the full expression of
> > KIP-441, this information would stop being propagated when the cluster
> > achieves a balanced task distribution. There was a follow-on idea I
> > POCed to continuously share lag information in the heartbeat protocol,
> > which you might be interested in, if you want to make sure that nodes
> > are basically _always_ aware of each others' lag on different
> > partitions: https://github.com/apache/kafka/pull/7096
> >
> > Thanks again!
> > -John
> >
> >
> > On Sat, Oct 19, 2019 at 6:06 AM 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Guozhang Wang
Hi Navinder,

Thanks for the KIP, I have a high level question about the proposed API
regarding:

"StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)"

I'm wondering if it's more general to refactor it to
"allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to
0 it means "active task only". Behind the scene, we can have the committed
offsets to encode the stream time as well, so that when processing standby
tasks the stream process knows not long the lag in terms of offsets
comparing to the committed offset (internally we call it offset limit), but
also the lag in terms of timestamp diff comparing the committed offset.

Also encoding the timestamp as part of offset have other benefits for
improving Kafka Streams time semantics as well, but for KIP-535 itself I
think it can help giving users a more intuitive interface to reason about.


Guozhang

On Mon, Oct 21, 2019 at 12:30 PM John Roesler  wrote:

> Hey Navinder,
>
> Thanks for the KIP! I've been reading over the discussion thus far,
> and I have a couple of thoughts to pile on as well:
>
> It seems confusing to propose the API in terms of the current system
> state, but also propose how the API would look if/when KIP-441 is
> implemented. It occurs to me that the only part of KIP-441 that would
> affect you is the availability of the lag information in the
> SubscriptionInfo message. This is actually not a public api change at
> all, and I'm planning to implement it asap as a precursor to the rest
> of KIP-441, so maybe you can just build on top of KIP-441 and assume
> the lag information will be available. Then you could have a more
> straightforward proposal (e.g., mention that you'd return the lag
> information in AssignmentInfo as well as in the StreamsMetadata in
> some form, or make use of it in the API somehow).
>
> I'm partially motivated in that former point because it seems like
> understanding how callers would bound the staleness for their use case
> is _the_ key point for this KIP. FWIW, I think that adding a new
> method to StreamsMetadataState and deprecating the existing method is
> the best way to go; we just can't change the return types of any
> existing methods.
>
> I'm wondering, rather than putting "acceptable lag" into the
> configuration at all, or even making it a parameter on
> `allMetadataForKey`, why not just _always_ return all available
> metadata (including active/standby or lag) and let the caller decide
> to which node they want to route the query? This method isn't making
> any queries itself; it's merely telling you where the local Streams
> instance _thinks_ the key in question is located. Just returning all
> available information lets the caller implement any semantics they
> desire around querying only active stores, or standbys, or recovering
> stores, or whatever.
>
> One fly in the ointment, which you may wish to consider if proposing
> to use lag information, is that the cluster would only become aware of
> new lag information during rebalances. Even in the full expression of
> KIP-441, this information would stop being propagated when the cluster
> achieves a balanced task distribution. There was a follow-on idea I
> POCed to continuously share lag information in the heartbeat protocol,
> which you might be interested in, if you want to make sure that nodes
> are basically _always_ aware of each others' lag on different
> partitions: https://github.com/apache/kafka/pull/7096
>
> Thanks again!
> -John
>
>
> On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
>  wrote:
> >
> > Thanks, Vinoth. Looks like we are on the same page. I will add some of
> these explanations to the KIP as well. Have assigned the KAFKA-6144 to
> myself and KAFKA-8994 is closed(by you). As suggested, we will replace
> "replica" with "standby".
> >
> > In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> enableReplicaServing, String storeName, K key, Serializer
> keySerializer)" Do we really need a per key configuration? or a new
> StreamsConfig is good enough?>> Coming from experience, when teams are
> building a platform with Kafka Streams and these API's serve data to
> multiple teams, we can't have a generalized config that says as a platform
> we will support stale reads or not. It should be the choice of someone who
> is calling the API's to choose whether they are ok with stale reads or not.
> Makes sense?
> > On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
> >
> >  Looks like we are covering ground :)
> >
> > >>Only if it is within a permissible  range(say 1) we will serve from
> > Restoring state of active.
> > +1 on having a knob like this.. My reasoning is as follows.
> >
> > Looking at the Streams state as a read-only distributed kv store. With
> > num_standby = f , we should be able to tolerate f failures and if there
> is
> > a f+1' failure, the system should be unavailable.
> >
> > A) So with num_standby=0, the system 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread John Roesler
Hey Navinder,

Thanks for the KIP! I've been reading over the discussion thus far,
and I have a couple of thoughts to pile on as well:

It seems confusing to propose the API in terms of the current system
state, but also propose how the API would look if/when KIP-441 is
implemented. It occurs to me that the only part of KIP-441 that would
affect you is the availability of the lag information in the
SubscriptionInfo message. This is actually not a public api change at
all, and I'm planning to implement it asap as a precursor to the rest
of KIP-441, so maybe you can just build on top of KIP-441 and assume
the lag information will be available. Then you could have a more
straightforward proposal (e.g., mention that you'd return the lag
information in AssignmentInfo as well as in the StreamsMetadata in
some form, or make use of it in the API somehow).

I'm partially motivated in that former point because it seems like
understanding how callers would bound the staleness for their use case
is _the_ key point for this KIP. FWIW, I think that adding a new
method to StreamsMetadataState and deprecating the existing method is
the best way to go; we just can't change the return types of any
existing methods.

I'm wondering, rather than putting "acceptable lag" into the
configuration at all, or even making it a parameter on
`allMetadataForKey`, why not just _always_ return all available
metadata (including active/standby or lag) and let the caller decide
to which node they want to route the query? This method isn't making
any queries itself; it's merely telling you where the local Streams
instance _thinks_ the key in question is located. Just returning all
available information lets the caller implement any semantics they
desire around querying only active stores, or standbys, or recovering
stores, or whatever.

One fly in the ointment, which you may wish to consider if proposing
to use lag information, is that the cluster would only become aware of
new lag information during rebalances. Even in the full expression of
KIP-441, this information would stop being propagated when the cluster
achieves a balanced task distribution. There was a follow-on idea I
POCed to continuously share lag information in the heartbeat protocol,
which you might be interested in, if you want to make sure that nodes
are basically _always_ aware of each others' lag on different
partitions: https://github.com/apache/kafka/pull/7096

Thanks again!
-John


On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
 wrote:
>
> Thanks, Vinoth. Looks like we are on the same page. I will add some of these 
> explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and 
> KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with 
> "standby".
>
> In the new API, "StreamsMetadataState::allMetadataForKey(boolean 
> enableReplicaServing, String storeName, K key, Serializer keySerializer)" 
> Do we really need a per key configuration? or a new StreamsConfig is good 
> enough?>> Coming from experience, when teams are building a platform with 
> Kafka Streams and these API's serve data to multiple teams, we can't have a 
> generalized config that says as a platform we will support stale reads or 
> not. It should be the choice of someone who is calling the API's to choose 
> whether they are ok with stale reads or not. Makes sense?
> On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar 
>  wrote:
>
>  Looks like we are covering ground :)
>
> >>Only if it is within a permissible  range(say 1) we will serve from
> Restoring state of active.
> +1 on having a knob like this.. My reasoning is as follows.
>
> Looking at the Streams state as a read-only distributed kv store. With
> num_standby = f , we should be able to tolerate f failures and if there is
> a f+1' failure, the system should be unavailable.
>
> A) So with num_standby=0, the system should be unavailable even if there is
> 1 failure and thats my argument for not allowing querying in restoration
> state, esp in this case it will be a total rebuild of the state (which IMO
> cannot be considered a normal fault free operational state).
>
> B) Even there are standby's, say num_standby=2, if the user decides to shut
> down all 3 instances, then only outcome should be unavailability until all
> of them come back or state is rebuilt on other nodes in the cluster. In
> normal operations, f <= 2 and when a failure does happen we can then either
> choose to be C over A and fail IQs until replication is fully caught up or
> choose A over C by serving in restoring state as long as lag is minimal. If
> even with f=1 say, all the standbys are lagging a lot due to some issue,
> then that should be considered a failure since that is different from
> normal/expected operational mode. Serving reads with unbounded replication
> lag and calling it "available" may not be very usable or even desirable :)
> IMHO, since it gives the user no way to reason about the app that is going
> to 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-19 Thread Navinder Brar
Thanks, Vinoth. Looks like we are on the same page. I will add some of these 
explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and 
KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with 
"standby".

In the new API, "StreamsMetadataState::allMetadataForKey(boolean 
enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do 
we really need a per key configuration? or a new StreamsConfig is good 
enough?>> Coming from experience, when teams are building a platform with Kafka 
Streams and these API's serve data to multiple teams, we can't have a 
generalized config that says as a platform we will support stale reads or not. 
It should be the choice of someone who is calling the API's to choose whether 
they are ok with stale reads or not. Makes sense?
On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar 
 wrote:  
 
 Looks like we are covering ground :)

>>Only if it is within a permissible  range(say 1) we will serve from
Restoring state of active.
+1 on having a knob like this.. My reasoning is as follows.

Looking at the Streams state as a read-only distributed kv store. With
num_standby = f , we should be able to tolerate f failures and if there is
a f+1' failure, the system should be unavailable.

A) So with num_standby=0, the system should be unavailable even if there is
1 failure and thats my argument for not allowing querying in restoration
state, esp in this case it will be a total rebuild of the state (which IMO
cannot be considered a normal fault free operational state).

B) Even there are standby's, say num_standby=2, if the user decides to shut
down all 3 instances, then only outcome should be unavailability until all
of them come back or state is rebuilt on other nodes in the cluster. In
normal operations, f <= 2 and when a failure does happen we can then either
choose to be C over A and fail IQs until replication is fully caught up or
choose A over C by serving in restoring state as long as lag is minimal. If
even with f=1 say, all the standbys are lagging a lot due to some issue,
then that should be considered a failure since that is different from
normal/expected operational mode. Serving reads with unbounded replication
lag and calling it "available" may not be very usable or even desirable :)
IMHO, since it gives the user no way to reason about the app that is going
to query this store.

So there is definitely a need to distinguish between :  Replication catchup
while being in fault free state vs Restoration of state when we lose more
than f standbys. This knob is a great starting point towards this.

If you agree with some of the explanation above, please feel free to
include it in the KIP as well since this is sort of our design principle
here..

Small nits :

- let's standardize on "standby" instead of "replica", KIP or code,  to be
consistent with rest of Streams code/docs?
- Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
Eventually need to consolidate KAFKA-6555 as well
- In the new API, "StreamsMetadataState::allMetadataForKey(boolean
enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do
we really need a per key configuration? or a new StreamsConfig is good
enough?

On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
 wrote:

> @Vinoth, I have incorporated a few of the discussions we have had in the
> KIP.
>
> In the current code, t0 and t1 serve queries from Active(Running)
> partition. For case t2, we are planning to return List
> such that it returns  so that if IQ
> fails on A, the replica on B can serve the data by enabling serving from
> replicas. This still does not solve case t3 and t4 since B has been
> promoted to active but it is in Restoring state to catchup till A’s last
> committed position as we don’t serve from Restoring state in Active and new
> Replica on R is building itself from scratch. Both these cases can be
> solved if we start serving from Restoring state of active as well since it
> is almost equivalent to previous Active.
>
> There could be a case where all replicas of a partition become unavailable
> and active and all replicas of that partition are building themselves from
> scratch, in this case, the state in Active is far behind even though it is
> in Restoring state. To cater to such cases that we don’t serve from this
> state we can either add another state before Restoring or check the
> difference between last committed offset and current position. Only if it
> is within a permissible range (say 1) we will serve from Restoring the
> state of Active.
>
>
>    On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  Thanks for the updates on the KIP, Navinder!
>
> Few comments
>
> - AssignmentInfo is not public API?. But we will change it and thus need to
> increment the version and test for version_probing etc. Good to separate
> that from StreamsMetadata changes (which is public API)
> - 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-17 Thread Vinoth Chandar
Looks like we are covering ground :)

>>Only if it is within a permissible  range(say 1) we will serve from
Restoring state of active.
+1 on having a knob like this.. My reasoning is as follows.

Looking at the Streams state as a read-only distributed kv store. With
num_standby = f , we should be able to tolerate f failures and if there is
a f+1' failure, the system should be unavailable.

A) So with num_standby=0, the system should be unavailable even if there is
1 failure and thats my argument for not allowing querying in restoration
state, esp in this case it will be a total rebuild of the state (which IMO
cannot be considered a normal fault free operational state).

B) Even there are standby's, say num_standby=2, if the user decides to shut
down all 3 instances, then only outcome should be unavailability until all
of them come back or state is rebuilt on other nodes in the cluster. In
normal operations, f <= 2 and when a failure does happen we can then either
choose to be C over A and fail IQs until replication is fully caught up or
choose A over C by serving in restoring state as long as lag is minimal. If
even with f=1 say, all the standbys are lagging a lot due to some issue,
then that should be considered a failure since that is different from
normal/expected operational mode. Serving reads with unbounded replication
lag and calling it "available" may not be very usable or even desirable :)
IMHO, since it gives the user no way to reason about the app that is going
to query this store.

So there is definitely a need to distinguish between :  Replication catchup
while being in fault free state vs Restoration of state when we lose more
than f standbys. This knob is a great starting point towards this.

If you agree with some of the explanation above, please feel free to
include it in the KIP as well since this is sort of our design principle
here..

Small nits :

- let's standardize on "standby" instead of "replica", KIP or code,  to be
consistent with rest of Streams code/docs?
- Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
Eventually need to consolidate KAFKA-6555 as well
- In the new API, "StreamsMetadataState::allMetadataForKey(boolean
enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do
we really need a per key configuration? or a new StreamsConfig is good
enough?

On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
 wrote:

> @Vinoth, I have incorporated a few of the discussions we have had in the
> KIP.
>
> In the current code, t0 and t1 serve queries from Active(Running)
> partition. For case t2, we are planning to return List
> such that it returns  so that if IQ
> fails on A, the replica on B can serve the data by enabling serving from
> replicas. This still does not solve case t3 and t4 since B has been
> promoted to active but it is in Restoring state to catchup till A’s last
> committed position as we don’t serve from Restoring state in Active and new
> Replica on R is building itself from scratch. Both these cases can be
> solved if we start serving from Restoring state of active as well since it
> is almost equivalent to previous Active.
>
> There could be a case where all replicas of a partition become unavailable
> and active and all replicas of that partition are building themselves from
> scratch, in this case, the state in Active is far behind even though it is
> in Restoring state. To cater to such cases that we don’t serve from this
> state we can either add another state before Restoring or check the
> difference between last committed offset and current position. Only if it
> is within a permissible range (say 1) we will serve from Restoring the
> state of Active.
>
>
> On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  Thanks for the updates on the KIP, Navinder!
>
> Few comments
>
> - AssignmentInfo is not public API?. But we will change it and thus need to
> increment the version and test for version_probing etc. Good to separate
> that from StreamsMetadata changes (which is public API)
> - From what I see, there is going to be choice between the following
>
>   A) introducing a new *KafkaStreams::allMetadataForKey() *API that
> potentially returns List ordered from most upto date to
> least upto date replicas. Today we cannot fully implement this ordering,
> since all we know is which hosts are active and which are standbys.
> However, this aligns well with the future. KIP-441 adds the lag information
> to the rebalancing protocol. We could also sort replicas based on the
> report lags eventually. This is fully backwards compatible with existing
> clients. Only drawback I see is the naming of the existing method
> KafkaStreams::metadataForKey, not conveying the distinction that it simply
> returns the active replica i.e allMetadataForKey.get(0).
>  B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
> change.
>
> I prefer A, since none of the 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-16 Thread Navinder Brar
@Vinoth, I have incorporated a few of the discussions we have had in the KIP. 

In the current code, t0 and t1 serve queries from Active(Running) partition. 
For case t2, we are planning to return List such that it 
returns  so that if IQ fails on A, the 
replica on B can serve the data by enabling serving from replicas. This still 
does not solve case t3 and t4 since B has been promoted to active but it is in 
Restoring state to catchup till A’s last committed position as we don’t serve 
from Restoring state in Active and new Replica on R is building itself from 
scratch. Both these cases can be solved if we start serving from Restoring 
state of active as well since it is almost equivalent to previous Active.

There could be a case where all replicas of a partition become unavailable and 
active and all replicas of that partition are building themselves from scratch, 
in this case, the state in Active is far behind even though it is in Restoring 
state. To cater to such cases that we don’t serve from this state we can either 
add another state before Restoring or check the difference between last 
committed offset and current position. Only if it is within a permissible range 
(say 1) we will serve from Restoring the state of Active.


On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar 
 wrote:  
 
 Thanks for the updates on the KIP, Navinder!

Few comments

- AssignmentInfo is not public API?. But we will change it and thus need to
increment the version and test for version_probing etc. Good to separate
that from StreamsMetadata changes (which is public API)
- From what I see, there is going to be choice between the following

  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
potentially returns List ordered from most upto date to
least upto date replicas. Today we cannot fully implement this ordering,
since all we know is which hosts are active and which are standbys.
However, this aligns well with the future. KIP-441 adds the lag information
to the rebalancing protocol. We could also sort replicas based on the
report lags eventually. This is fully backwards compatible with existing
clients. Only drawback I see is the naming of the existing method
KafkaStreams::metadataForKey, not conveying the distinction that it simply
returns the active replica i.e allMetadataForKey.get(0).
 B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
change.

I prefer A, since none of the semantics/behavior changes for existing
users. Love to hear more thoughts. Can we also work this into the KIP?
I already implemented A to unblock myself for now. Seems feasible to do.


On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar 
wrote:

> >>I get your point. But suppose there is a replica which has just become
> active, so in that case replica will still be building itself from scratch
> and this active will go to restoring state till it catches up with previous
> active, wouldn't serving from a restoring active make more sense than a
> replica in such case.
>
> KIP-441 will change this behavior such that promotion to active happens
> based on how caught up a replica is. So, once we have that (work underway
> already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
> staleness window should not be that long as you describe. IMO if user wants
> availability for state, then should configure num.standby.replicas > 0. If
> not, then on a node loss, few partitions would be unavailable for a while
> (there are other ways to bring this window down, which I won't bring in
> here). We could argue for querying a restoring active (say a new node added
> to replace a faulty old node) based on AP vs CP principles. But not sure
> reading really really old values for the sake of availability is useful. No
> AP data system would be inconsistent for such a long time in practice.
>
> So, I still feel just limiting this to standby reads provides best
> semantics.
>
> Just my 2c. Would love to see what others think as well.
>
> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
>  wrote:
>
>> Hi Vinoth,
>> Thanks for the feedback.
>>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
>> Based on the discussion on KAFKA-6144, I was under the impression that
>> this KIP is also going to cover exposing of the standby information in
>> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
>> API change?>> Sure, I can add changes for 8994 in this KIP and link
>> KAFKA-6144 to KAFKA-8994 as well.
>>  KIP seems to be focussing on restoration when a new node is added.
>> KIP-441 is underway and has some major changes proposed for this. It would
>> be good to clarify dependencies if any. Without KIP-441, I am not very sure
>> if we should allow reads from nodes in RESTORING state, which could amount
>> to many minutes/few hours of stale reads?  This is different from allowing
>> querying standby replicas, which could be mostly caught up and the
>> staleness 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-16 Thread Vinoth Chandar
Thanks for the updates on the KIP, Navinder!

Few comments

- AssignmentInfo is not public API?. But we will change it and thus need to
increment the version and test for version_probing etc. Good to separate
that from StreamsMetadata changes (which is public API)
- From what I see, there is going to be choice between the following

  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
potentially returns List ordered from most upto date to
least upto date replicas. Today we cannot fully implement this ordering,
since all we know is which hosts are active and which are standbys.
However, this aligns well with the future. KIP-441 adds the lag information
to the rebalancing protocol. We could also sort replicas based on the
report lags eventually. This is fully backwards compatible with existing
clients. Only drawback I see is the naming of the existing method
KafkaStreams::metadataForKey, not conveying the distinction that it simply
returns the active replica i.e allMetadataForKey.get(0).
 B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
change.

I prefer A, since none of the semantics/behavior changes for existing
users. Love to hear more thoughts. Can we also work this into the KIP?
I already implemented A to unblock myself for now. Seems feasible to do.


On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar 
wrote:

> >>I get your point. But suppose there is a replica which has just become
> active, so in that case replica will still be building itself from scratch
> and this active will go to restoring state till it catches up with previous
> active, wouldn't serving from a restoring active make more sense than a
> replica in such case.
>
> KIP-441 will change this behavior such that promotion to active happens
> based on how caught up a replica is. So, once we have that (work underway
> already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
> staleness window should not be that long as you describe. IMO if user wants
> availability for state, then should configure num.standby.replicas > 0. If
> not, then on a node loss, few partitions would be unavailable for a while
> (there are other ways to bring this window down, which I won't bring in
> here). We could argue for querying a restoring active (say a new node added
> to replace a faulty old node) based on AP vs CP principles. But not sure
> reading really really old values for the sake of availability is useful. No
> AP data system would be inconsistent for such a long time in practice.
>
> So, I still feel just limiting this to standby reads provides best
> semantics.
>
> Just my 2c. Would love to see what others think as well.
>
> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
>  wrote:
>
>> Hi Vinoth,
>> Thanks for the feedback.
>>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
>> Based on the discussion on KAFKA-6144, I was under the impression that
>> this KIP is also going to cover exposing of the standby information in
>> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
>> API change?>> Sure, I can add changes for 8994 in this KIP and link
>> KAFKA-6144 to KAFKA-8994 as well.
>>   KIP seems to be focussing on restoration when a new node is added.
>> KIP-441 is underway and has some major changes proposed for this. It would
>> be good to clarify dependencies if any. Without KIP-441, I am not very sure
>> if we should allow reads from nodes in RESTORING state, which could amount
>> to many minutes/few hours of stale reads?  This is different from allowing
>> querying standby replicas, which could be mostly caught up and the
>> staleness window could be much smaller/tolerable. (once again the focus on
>> KAFKA-8994).>> I get your point. But suppose there is a replica which has
>> just become active, so in that case replica will still be building itself
>> from scratch and this active will go to restoring state till it catches up
>> with previous active, wouldn't serving from a restoring active make more
>> sense than a replica in such case.
>>
>> Finally, we may need to introduce a configuration to control this. Some
>> users may prefer errors to stale data. Can we also add it to the KIP?>>
>> Will add this.
>>
>> Regards,
>> Navinder
>>
>>
>> On2019/10/14 16:56:49, Vinoth Chandar wrote:
>>
>> >Hi Navinder,>
>>
>> >
>>
>> >Thanks for sharing the KIP! Few thoughts>
>>
>> >
>>
>> >- Can we link the JIRA, discussion thread also to the KIP>
>>
>> >- Based on the discussion on KAFKA-6144, I was under the impression
>> that>
>>
>> >this KIP is also going to cover exposing of the standby information in>
>>
>> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a
>> public>
>>
>> >API change?>
>>
>> >- KIP seems to be focussing on restoration when a new node is added.>
>>
>> >KIP-441 is underway and has some major changes proposed for this. It
>> would>
>>
>> >be good to clarify dependencies if any. Without KIP-441, I am not very
>> sure>
>>
>> >if 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-15 Thread Vinoth Chandar
>>I get your point. But suppose there is a replica which has just become
active, so in that case replica will still be building itself from scratch
and this active will go to restoring state till it catches up with previous
active, wouldn't serving from a restoring active make more sense than a
replica in such case.

KIP-441 will change this behavior such that promotion to active happens
based on how caught up a replica is. So, once we have that (work underway
already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
staleness window should not be that long as you describe. IMO if user wants
availability for state, then should configure num.standby.replicas > 0. If
not, then on a node loss, few partitions would be unavailable for a while
(there are other ways to bring this window down, which I won't bring in
here). We could argue for querying a restoring active (say a new node added
to replace a faulty old node) based on AP vs CP principles. But not sure
reading really really old values for the sake of availability is useful. No
AP data system would be inconsistent for such a long time in practice.

So, I still feel just limiting this to standby reads provides best
semantics.

Just my 2c. Would love to see what others think as well.

On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
 wrote:

> Hi Vinoth,
> Thanks for the feedback.
>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
> Based on the discussion on KAFKA-6144, I was under the impression that
> this KIP is also going to cover exposing of the standby information in
> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
> API change?>> Sure, I can add changes for 8994 in this KIP and link
> KAFKA-6144 to KAFKA-8994 as well.
>   KIP seems to be focussing on restoration when a new node is added.
> KIP-441 is underway and has some major changes proposed for this. It would
> be good to clarify dependencies if any. Without KIP-441, I am not very sure
> if we should allow reads from nodes in RESTORING state, which could amount
> to many minutes/few hours of stale reads?  This is different from allowing
> querying standby replicas, which could be mostly caught up and the
> staleness window could be much smaller/tolerable. (once again the focus on
> KAFKA-8994).>> I get your point. But suppose there is a replica which has
> just become active, so in that case replica will still be building itself
> from scratch and this active will go to restoring state till it catches up
> with previous active, wouldn't serving from a restoring active make more
> sense than a replica in such case.
>
> Finally, we may need to introduce a configuration to control this. Some
> users may prefer errors to stale data. Can we also add it to the KIP?>>
> Will add this.
>
> Regards,
> Navinder
>
>
> On2019/10/14 16:56:49, Vinoth Chandar wrote:
>
> >Hi Navinder,>
>
> >
>
> >Thanks for sharing the KIP! Few thoughts>
>
> >
>
> >- Can we link the JIRA, discussion thread also to the KIP>
>
> >- Based on the discussion on KAFKA-6144, I was under the impression that>
>
> >this KIP is also going to cover exposing of the standby information in>
>
> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a
> public>
>
> >API change?>
>
> >- KIP seems to be focussing on restoration when a new node is added.>
>
> >KIP-441 is underway and has some major changes proposed for this. It
> would>
>
> >be good to clarify dependencies if any. Without KIP-441, I am not very
> sure>
>
> >if we should allow reads from nodes in RESTORING state, which could
> amount>
>
> >to many minutes/few hours of stale reads?  This is different
> fromallowing>
>
> >querying standby replicas, which could be mostly caught up and the>
>
> >staleness window could be much smaller/tolerable. (once again the focus
> on>
>
> >KAFKA-8994)>
>
> >- Finally, we may need to introduce a configuration to control this.
> Some>
>
> >users may prefer errors to stale data. Can we also add it to the KIP?>
>
> >
>
> >Thanks>
>
> >Vinoth>
>
> >
>
> >
>
> >
>
> >
>
> >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
>
> >wrote:>
>
> >
>
> >> Hi,>
>
> >> Starting a discussion on the KIP to Allow state stores to serve stale>
>
> >> reads during rebalance(>
>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >
>
> >> ).>
>
> >> Thanks & Regards,Navinder>
>
> >> LinkedIn>
>
> >>>
> >


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-15 Thread Navinder Brar
Hi Vinoth,
Thanks for the feedback. 
 Can we link the JIRA, discussion thread also to the KIP.>> Added.
Based on the discussion on KAFKA-6144, I was under the impression that this KIP 
is also going to cover exposing of the standby information in StreamsMetadata 
and thus subsume KAFKA-8994 . That would require a public API change?>> Sure, I 
can add changes for 8994 in this KIP and link  KAFKA-6144 to KAFKA-8994 as well.
  KIP seems to be focussing on restoration when a new node is added. KIP-441 is 
underway and has some major changes proposed for this. It would be good to 
clarify dependencies if any. Without KIP-441, I am not very sure if we should 
allow reads from nodes in RESTORING state, which could amount to many 
minutes/few hours of stale reads?  This is different from allowing querying 
standby replicas, which could be mostly caught up and the staleness window 
could be much smaller/tolerable. (once again the focus on KAFKA-8994).>> I get 
your point. But suppose there is a replica which has just become active, so in 
that case replica will still be building itself from scratch and this active 
will go to restoring state till it catches up with previous active, wouldn't 
serving from a restoring active make more sense than a replica in such case.

Finally, we may need to introduce a configuration to control this. Some users 
may prefer errors to stale data. Can we also add it to the KIP?>> Will add this.

Regards,
Navinder


On2019/10/14 16:56:49, Vinoth Chandar wrote: 

>Hi Navinder,> 

>

>Thanks for sharing the KIP! Few thoughts> 

>

>- Can we link the JIRA, discussion thread also to the KIP> 

>- Based on the discussion on KAFKA-6144, I was under the impression that> 

>this KIP is also going to cover exposing of the standby information in> 

>StreamsMetadata and thus subsume KAFKA-8994 . That would require a public> 

>API change?> 

>- KIP seems to be focussing on restoration when a new node is added.> 

>KIP-441 is underway and has some major changes proposed for this. It would> 

>be good to clarify dependencies if any. Without KIP-441, I am not very sure>

>if we should allow reads from nodes in RESTORING state, which could amount> 

>to many minutes/few hours of stale reads?  This is different fromallowing> 

>querying standby replicas, which could be mostly caught up and the> 

>staleness window could be much smaller/tolerable. (once again the focus on> 

>KAFKA-8994)> 

>- Finally, we may need to introduce a configuration to control this. Some> 

>users may prefer errors to stale data. Can we also add it to the KIP?> 

>

>Thanks> 

>Vinoth> 

>

>

>

>

>On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar> 

>wrote:> 

>

>> Hi,> 

>> Starting a discussion on the KIP to Allow state stores to serve stale> 

>> reads during rebalance(> 

>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance>

>> ).> 

>> Thanks & Regards,Navinder> 

>> LinkedIn> 

>>> 
> 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-14 Thread Vinoth Chandar
Hi Navinder,

Thanks for sharing the KIP! Few thoughts

- Can we link the JIRA, discussion thread also to the KIP
- Based on the discussion on KAFKA-6144, I was under the impression that
this KIP is also going to cover exposing of the standby information in
StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
API change?
- KIP seems to be focussing on restoration when a new node is added.
KIP-441 is underway and has some major changes proposed for this. It would
be good to clarify dependencies if any. Without KIP-441, I am not very sure
if we should allow reads from nodes in RESTORING state, which could amount
to many minutes/few hours of stale reads?  This is different from allowing
querying standby replicas, which could be mostly caught up and the
staleness window could be much smaller/tolerable. (once again the focus on
KAFKA-8994)
- Finally, we may need to introduce a configuration to control this. Some
users may prefer errors to stale data. Can we also add it to the KIP?

Thanks
Vinoth




On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar
 wrote:

> Hi,
> Starting a discussion on the KIP to Allow state stores to serve stale
> reads during rebalance(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> ).
> Thanks & Regards,Navinder
> LinkedIn
>


[DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-13 Thread Navinder Brar
Hi,
Starting a discussion on the KIP to Allow state stores to serve stale reads 
during 
rebalance(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance).
Thanks & Regards,Navinder 
LinkedIn