Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Matthias J. Sax
Thanks. I buy the argument about the lag for active tasks. Nit: The KIP briefly mentions the deprecation of `metadataFoKey()` methods -- those should be listed as `@deprecated` next to the newly added methods to point this change out more visibly. Nit: in the code example, why do we loop over

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Guozhang Wang
10/20: I think I'm aligned with John's replies as well. Guozhang On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar wrote: > >during restoring state the active might have some lag > > Great catch, yes.. we cannot assume lag = 0 for active. Lets report active > lag as well then. If active is too

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
>during restoring state the active might have some lag Great catch, yes.. we cannot assume lag = 0 for active. Lets report active lag as well then. If active is too laggy, the app can then deem the store partition unavailable (based on what the application is willing to tolerate). @matthias do

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Navinder Brar
I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all actives don't have 0 lag, as during restoring state the active might have some lag and one of the features of this KIP is to provide an option to query from active (which might be in restoring state).  I will update the KIP

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread John Roesler
Hi all, Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion. 10. My 2 cents: we might as well include the lags for the active copies as well. This is a more context-free API. If we only include standbys, this choice won't make sense to users unless they understand

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
10. I considered that. Had to pick one or the other. Can just return standby too and rename method to may be “allLocalStandbyOffsetLags()” to have it explicit. (Standby should implicitly convey that we are talking about stores) 20. What I meant was, we are returning HostInfo instead of

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-12 Thread Vinoth Chandar
In all, is everyone OK with - Dropping KeyQueryMetadata, and the allMetadataForKey() apis - Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo class - Add offsetLag(store, key, serializer) -> Optional & offsetLag(store, key, partitioner) -> Optional to StreamsMetadata -

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-12 Thread Navinder Brar
- Looking back, I agree that 2 calls to StreamsMetadata to fetch StreamsMetadata and then using something like ‘long StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is better than introducing a new public API i.e. ‘KeyQueryMetadata’. I will change the KIP accordingly.

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-11 Thread Guozhang Wang
Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so 1) allMetadata would still return the same number of StreamsMetadata in collection, just that within the StreamsMetadata now you have new APIs to access standby partitions / stores. So I think it would not be a breaking

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-09 Thread Matthias J. Sax
I agree, that we might want to drop the time-base lag for the initial implementation. There is no good way to get this information without a broker side change. (100) For the offset lag information, I don't see a reason why the app should drive when this information is updated though, because

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-08 Thread Navinder Brar
Thanks, Guozhang for going through it again. - 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata is not topicName, but the partition number. I agree changelog topicNames and store names will have 1-1 mapping but we also need the partition number of the changelog for

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-07 Thread Guozhang Wang
Hi Navinder, Vinoth, thanks for the updated KIP! Read through the discussions so far and made another pass on the wiki page, and here are some more comments: 1. About the public APIs: 1.1. It is not clear to me how allStandbyMetadataForStore and allStandbyMetadata would be differentiated from

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-06 Thread Navinder Brar
+1 on implementing offset based lag for now and push time-based lag to a later point in time when broker changes are done. Although time-based lag enhances the readability, it would not be a make or break change for implementing this KIP.  Vinoth has explained the role of KeyQueryMetadata, let

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-06 Thread Vinoth Chandar
+1 to John, suggestion on Duration/Instant and dropping the API to fetch all store's lags. However, I do think we need to return lags per topic partition. So not sure if single return value would work? We need some new class that holds a TopicPartition and Duration/Instant variables together? 10)

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Matthias J. Sax
Navinder, thanks for updating the KIP. Couple of follow up questions: (10) Why do we need to introduce the class `KeyQueryMetadata`? (20) Why do we introduce the two methods `allMetadataForKey()`? Would it not be simpler to add `Collection standbyMetadataForKey(...)`. This would align with new

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Navinder Brar
Thanks John and Vinoth for converging thoughts on AssignmentInfo. - Report the time difference between the last consumed changelog record's timestamp and the changelog tail record's timestamp. This is an indicator of how fresh the local copy of a store is with respect to the active copy.

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread John Roesler
Hey Vinoth, Really sorry, I just remembered that I started a reply earlier today, but got side-tracked. Regarding the AssignmentInfo extension: Your explanation for this point makes sense. I was incorrectly thinking that the cluster metadata was shared with all members, but now I see it's only

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Vinoth Chandar
Ping :) Any thoughts? On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar wrote: > >> I'm having some trouble wrapping my head around what race conditions > might occur, other than the fundamentally broken state in which different > instances are running totally different topologies. > 3. @both

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
>> I'm having some trouble wrapping my head around what race conditions might occur, other than the fundamentally broken state in which different instances are running totally different topologies. 3. @both Without the topic partitions that the tasks can map back to, we have to rely on

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Sophie Blee-Goldman
Regarding 3) I'm wondering, does your concern still apply even now that the pluggable PartitionGrouper interface has been deprecated? Now that we can be sure that the DefaultPartitionGrouper is used to generate the taskId -> partitions mapping, we should be able to convert any taskId to any

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread John Roesler
Hey Vinoth, thanks for the reply! 3. I get that it's not the main focus of this KIP, but if it's ok, it would be nice to hash out this point right now. It only came up because this KIP-535 is substantially extending the pattern in question. If we push it off until later, then the reviewers are

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
3. Right now, we still get the topic partitions assigned as a part of the top level Assignment object (the one that wraps AssignmentInfo) and use that to convert taskIds back. This list of only contains assignments for that particular instance. Attempting to also reverse map for "all" the tasksIds

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-01 Thread Sophie Blee-Goldman
Adding on to John's response to 3), can you clarify when and why exactly we cannot convert between taskIds and partitions? If that's really the case I don't feel confident that the StreamsPartitionAssignor is not full of bugs... It seems like it currently just encodes a list of all partitions

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-01 Thread John Roesler
Thanks, all, for considering the points! 3. Interesting. I have a vague recollection of that... Still, though, it seems a little fishy. After all, we return the assignments themselves as task ids, and the members have to map these to topic partitions in order to configure themselves properly. If

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Navinder Brar
Thanks John for going through this. - +1, makes sense - +1, no issues there - Yeah the initial patch I had submitted for K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo object had taskIds but the merged PR had similar size according to Vinoth and it was

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Vinoth Chandar
1. Was trying to spell them out separately. but makes sense for readability. done 2. No I immediately agree :) .. makes sense. @navinder? 3. I actually attempted only sending taskIds while working on KAFKA-7149. Its non-trivial to handle edges cases resulting from newly added topic partitions

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread John Roesler
Hey Navinder, Thanks for updating the KIP, it's a lot easier to see the current state of the proposal now. A few remarks: 1. I'm sure it was just an artifact of revisions, but you have two separate sections where you list additions to the KafkaStreams interface. Can you consolidate those so we

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread John Roesler
Hey Vinoth, I started going over the KIP again yesterday. There are a lot of updates, and I didn't finish my feedback in one day. I'm working on it now. Thanks, John On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar wrote: > > Wondering if anyone has thoughts on these changes? I liked that the

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Vinoth Chandar
Wondering if anyone has thoughts on these changes? I liked that the new metadata fetch APIs provide all the information at once with consistent naming.. Any guidance on what you would like to be discussed or fleshed out more before we call a VOTE? On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-30 Thread Navinder Brar
Hi, We have made some edits in the KIP(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance) after due deliberation on the agreed design to support the new query design. This includes the new public API to query offset/time lag

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-26 Thread Navinder Brar
>> Since there are two soft votes for separate active/standby API methods, I  >>also change my position on that. Fine with 2 separate methods. Once we remove >>the lag information from these APIs, returning a List is less attractive, >>since the ordering has no special meaning now. Agreed, now

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Vinoth Chandar
+1 to Sophie's suggestion. Having both lag in terms of time and offsets is good and makes for a more complete API. Since there are two soft votes for separate active/standby API methods, I also change my position on that. Fine with 2 separate methods. Once we remove the lag information from these

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Bill Bejeck
I am jumping in a little late here. Overall I agree with the proposal to push decision making on what/how to query in the query layer. For point 5 from above, I'm slightly in favor of having a new method, "standbyMetadataForKey()" or something similar. Because even if we return all tasks in one

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
I think now we are aligned on almost all the design parts. Summarising below what has been discussed above and we have a general consensus on. - Rather than broadcasting lag across all nodes at rebalancing/with the heartbeat, we will just return a list of all available standby’s in the

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
I think now we are aligned on almost all the design parts. Summarising below what has been discussed above and we have a general consensus on. 1. Rather than broadcasting lag across all nodes at rebalancing/with the heartbeat, we will just return a list of all available standby’s in the system

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Matthias J. Sax
Just to close the loop @Vinoth: > 1. IIUC John intends to add (or we can do this in this KIP) lag information > to AssignmentInfo, which gets sent to every participant. As explained by John, currently KIP-441 plans to only report the information to the leader. But I guess, with the new proposal

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Sophie Blee-Goldman
Just to chime in on the "report lag vs timestamp difference" issue, I would actually advocate for both. As mentioned already, time difference is probably a lot easier and/or more useful to reason about in terms of "freshness" of the state. But in the case when all queried stores are far behind,

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
Sending the older replies again as they were unreadable due to bad formatting. >> There was a follow-on idea I POCed to continuously share lag information in >> the heartbeat protocol +1 that would be great, I will update the KIP assuming this work will finish soon >> I think that adding a

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread Guozhang Wang
I think I agree with John's recent reasoning as well: instead of letting the storeMetadataAPI to return the staleness information, letting the client to query either active or standby and letting standby query response to include both the values + timestamp (or lag, as in diffs of timestamps)

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread Vinoth Chandar
+1 As someone implementing a query routing layer, there is already a need to have mechanisms in place to do healthchecks/failure detection to detect failures for queries, while Streams rebalancing eventually kicks in the background. So, pushing this complexity to the IQ client app keeps Streams

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread John Roesler
Hi all, I've been mulling about this KIP, and I think I was on the wrong track earlier with regard to task lags. Tl;dr: I don't think we should add lags at all to the metadata API (and also not to the AssignmentInfo protocol message). Like I mentioned early on, reporting lag via

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-23 Thread Vinoth Chandar
Responding to the points raised by Matthias 1. IIUC John intends to add (or we can do this in this KIP) lag information to AssignmentInfo, which gets sent to every participant. 2. At-least I was under the assumption that it can be called per query, since the API docs don't seem to suggest

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-23 Thread Navinder Brar
Thanks, everyone for taking a look. Some very cool ideas have flown in.  >> There was a follow-on idea I POCed to continuously share lag information in >> the heartbeat protocol+1 that would be great, I will update the KIP assuming >> this work will finish soon >> I think that adding a new

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-22 Thread Matthias J. Sax
One more thought: 14) Is specifying the allowed lag in number of records a useful way for users to declare how stale an instance is allowed to be? Would it be more intuitive for users to specify the allowed lag in time units (would event time or processing time be better)? It seems hard for users

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Matthias J. Sax
Some more follow up thoughts: 11) If we get a store handle of an active(restoring) task, and the task transits to running, does the store handle become invalid and a new one must be retrieved? Or can we "switch it out" underneath -- for this case, how does the user know when they start to query

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Matthias J. Sax
Thanks for the KIP. Couple of comments: 1) With regard to KIP-441, my current understanding is that the lag information is only reported to the leader (please correct me if I am wrong). This seems to be quite a limitation to actually use the lag information. 2) The idea of the metadata API is

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Vinoth Chandar
>>I'm wondering, rather than putting "acceptable lag" into the configuration at all, or even making it a parameter on `allMetadataForKey`, why not just _always_ return all available metadata (including active/standby or lag) and let the caller decide to which node they want to route the query? +1

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Guozhang Wang
Hi Navinder, Thanks for the KIP, I have a high level question about the proposed API regarding: "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)" I'm wondering if it's more general to refactor it to "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread John Roesler
Hey Navinder, Thanks for the KIP! I've been reading over the discussion thus far, and I have a couple of thoughts to pile on as well: It seems confusing to propose the API in terms of the current system state, but also propose how the API would look if/when KIP-441 is implemented. It occurs to

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-19 Thread Navinder Brar
Thanks, Vinoth. Looks like we are on the same page. I will add some of these explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and  KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with "standby". In the new API,

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-17 Thread Vinoth Chandar
Looks like we are covering ground :) >>Only if it is within a permissible range(say 1) we will serve from Restoring state of active. +1 on having a knob like this.. My reasoning is as follows. Looking at the Streams state as a read-only distributed kv store. With num_standby = f , we should

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-16 Thread Navinder Brar
@Vinoth, I have incorporated a few of the discussions we have had in the KIP.  In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List such that it returns so that if IQ fails on A, the replica on B can serve the data by

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-16 Thread Vinoth Chandar
Thanks for the updates on the KIP, Navinder! Few comments - AssignmentInfo is not public API?. But we will change it and thus need to increment the version and test for version_probing etc. Good to separate that from StreamsMetadata changes (which is public API) - From what I see, there is going

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-15 Thread Vinoth Chandar
>>I get your point. But suppose there is a replica which has just become active, so in that case replica will still be building itself from scratch and this active will go to restoring state till it catches up with previous active, wouldn't serving from a restoring active make more sense than a

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-15 Thread Navinder Brar
Hi Vinoth, Thanks for the feedback.  Can we link the JIRA, discussion thread also to the KIP.>> Added. Based on the discussion on KAFKA-6144, I was under the impression that this KIP is also going to cover exposing of the standby information in StreamsMetadata and thus subsume KAFKA-8994 . That

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-14 Thread Vinoth Chandar
Hi Navinder, Thanks for sharing the KIP! Few thoughts - Can we link the JIRA, discussion thread also to the KIP - Based on the discussion on KAFKA-6144, I was under the impression that this KIP is also going to cover exposing of the standby information in StreamsMetadata and thus subsume

[DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-13 Thread Navinder Brar
Hi, Starting a discussion on the KIP to Allow state stores to serve stale reads during rebalance(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance). Thanks & Regards,Navinder  LinkedIn