Thanks.
I buy the argument about the lag for active tasks.
Nit: The KIP briefly mentions the deprecation of `metadataFoKey()`
methods -- those should be listed as `@deprecated` next to the newly
added methods to point this change out more visibly.
Nit: in the code example, why do we loop over
10/20: I think I'm aligned with John's replies as well.
Guozhang
On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar
wrote:
> >during restoring state the active might have some lag
>
> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
> lag as well then. If active is too
>during restoring state the active might have some lag
Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
lag as well then. If active is too laggy, the app can then deem the store
partition unavailable (based on what the application is willing to
tolerate).
@matthias do
I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all actives
don't have 0 lag, as during restoring state the active might have some lag and
one of the features of this KIP is to provide an option to query from active
(which might be in restoring state).
I will update the KIP
Hi all,
Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion.
10. My 2 cents: we might as well include the lags for the active
copies as well. This is a more context-free API. If we only include
standbys, this choice won't make sense to users unless they understand
10. I considered that. Had to pick one or the other. Can just return
standby too and rename method to may be “allLocalStandbyOffsetLags()” to
have it explicit. (Standby should implicitly convey that we are talking
about stores)
20. What I meant was, we are returning HostInfo instead of
In all, is everyone OK with
- Dropping KeyQueryMetadata, and the allMetadataForKey() apis
- Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo class
- Add offsetLag(store, key, serializer) -> Optional &
offsetLag(store, key, partitioner) -> Optional to StreamsMetadata
-
- Looking back, I agree that 2 calls to StreamsMetadata to fetch
StreamsMetadata and then using something like ‘long
StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is better than
introducing a new public API i.e. ‘KeyQueryMetadata’. I will change the KIP
accordingly.
Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams instances, so
1) allMetadata would still return the same number of StreamsMetadata in
collection, just that within the StreamsMetadata now you have new APIs to
access standby partitions / stores. So I think it would not be a breaking
I agree, that we might want to drop the time-base lag for the initial
implementation. There is no good way to get this information without a
broker side change.
(100) For the offset lag information, I don't see a reason why the app
should drive when this information is updated though, because
Thanks, Guozhang for going through it again.
- 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata is
not topicName, but the partition number. I agree changelog topicNames and store
names will have 1-1 mapping but we also need the partition number of the
changelog for
Hi Navinder, Vinoth, thanks for the updated KIP!
Read through the discussions so far and made another pass on the wiki page,
and here are some more comments:
1. About the public APIs:
1.1. It is not clear to me how allStandbyMetadataForStore
and allStandbyMetadata would be differentiated from
+1 on implementing offset based lag for now and push time-based lag to a later
point in time when broker changes are done. Although time-based lag enhances
the readability, it would not be a make or break change for implementing this
KIP.
Vinoth has explained the role of KeyQueryMetadata, let
+1 to John, suggestion on Duration/Instant and dropping the API to fetch
all store's lags. However, I do think we need to return lags per topic
partition. So not sure if single return value would work? We need some new
class that holds a TopicPartition and Duration/Instant variables together?
10)
Navinder,
thanks for updating the KIP. Couple of follow up questions:
(10) Why do we need to introduce the class `KeyQueryMetadata`?
(20) Why do we introduce the two methods `allMetadataForKey()`? Would it
not be simpler to add `Collection
standbyMetadataForKey(...)`. This would align with new
Thanks John and Vinoth for converging thoughts on AssignmentInfo.
- Report the time difference between the last consumed changelog record's
timestamp and the changelog tail record's timestamp. This is an indicator of
how fresh the local copy of a store is with respect to the active copy.
Hey Vinoth,
Really sorry, I just remembered that I started a reply earlier today,
but got side-tracked.
Regarding the AssignmentInfo extension:
Your explanation for this point makes sense. I was incorrectly
thinking that the cluster metadata was shared with all members, but
now I see it's only
Ping :) Any thoughts?
On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar wrote:
> >> I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both
>> I'm having some trouble wrapping my head around what race conditions
might occur, other than the fundamentally broken state in which different
instances are running totally different topologies.
3. @both Without the topic partitions that the tasks can map back to, we
have to rely on
Regarding 3) I'm wondering, does your concern still apply even now
that the pluggable PartitionGrouper interface has been deprecated?
Now that we can be sure that the DefaultPartitionGrouper is used to generate
the taskId -> partitions mapping, we should be able to convert any taskId
to any
Hey Vinoth, thanks for the reply!
3.
I get that it's not the main focus of this KIP, but if it's ok, it
would be nice to hash out this point right now. It only came up
because this KIP-535 is substantially extending the pattern in
question. If we push it off until later, then the reviewers are
3. Right now, we still get the topic partitions assigned as a part of the
top level Assignment object (the one that wraps AssignmentInfo) and use
that to convert taskIds back. This list of only contains assignments for
that particular instance. Attempting to also reverse map for "all" the
tasksIds
Adding on to John's response to 3), can you clarify when and why exactly we
cannot
convert between taskIds and partitions? If that's really the case I don't
feel confident
that the StreamsPartitionAssignor is not full of bugs...
It seems like it currently just encodes a list of all partitions
Thanks, all, for considering the points!
3. Interesting. I have a vague recollection of that... Still, though,
it seems a little fishy. After all, we return the assignments
themselves as task ids, and the members have to map these to topic
partitions in order to configure themselves properly. If
Thanks John for going through this.
- +1, makes sense
- +1, no issues there
- Yeah the initial patch I had submitted for
K-7149(https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo
object had taskIds but the merged PR had similar size according to Vinoth and
it was
1. Was trying to spell them out separately. but makes sense for
readability. done
2. No I immediately agree :) .. makes sense. @navinder?
3. I actually attempted only sending taskIds while working on KAFKA-7149.
Its non-trivial to handle edges cases resulting from newly added topic
partitions
Hey Navinder,
Thanks for updating the KIP, it's a lot easier to see the current
state of the proposal now.
A few remarks:
1. I'm sure it was just an artifact of revisions, but you have two
separate sections where you list additions to the KafkaStreams
interface. Can you consolidate those so we
Hey Vinoth,
I started going over the KIP again yesterday. There are a lot of
updates, and I didn't finish my feedback in one day. I'm working on it
now.
Thanks,
John
On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar wrote:
>
> Wondering if anyone has thoughts on these changes? I liked that the
Wondering if anyone has thoughts on these changes? I liked that the new
metadata fetch APIs provide all the information at once with consistent
naming..
Any guidance on what you would like to be discussed or fleshed out more
before we call a VOTE?
On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
Hi,
We have made some edits in the
KIP(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
after due deliberation on the agreed design to support the new query design.
This includes the new public API to query offset/time lag
>> Since there are two soft votes for separate active/standby API methods, I
>>also change my position on that. Fine with 2 separate methods. Once we remove
>>the lag information from these APIs, returning a List is less attractive,
>>since the ordering has no special meaning now.
Agreed, now
+1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.
Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these
I am jumping in a little late here.
Overall I agree with the proposal to push decision making on what/how to
query in the query layer.
For point 5 from above, I'm slightly in favor of having a new method,
"standbyMetadataForKey()" or something similar.
Because even if we return all tasks in one
I think now we are aligned on almost all the design parts. Summarising below
what has been discussed above and we have a general consensus on.
- Rather than broadcasting lag across all nodes at rebalancing/with the
heartbeat, we will just return a list of all available standby’s in the
I think now we are aligned on almost all the design parts. Summarising below
what has been discussed above and we have a general consensus on.
1. Rather than broadcasting lag across all nodes at rebalancing/with the
heartbeat, we will just return a list of all available standby’s in the system
Just to close the loop @Vinoth:
> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.
As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal
Just to chime in on the "report lag vs timestamp difference" issue, I would
actually advocate for both. As mentioned already, time difference is
probably a lot easier and/or more useful to reason about in terms of
"freshness"
of the state. But in the case when all queried stores are far behind,
Sending the older replies again as they were unreadable due to bad formatting.
>> There was a follow-on idea I POCed to continuously share lag information in
>> the heartbeat protocol
+1 that would be great, I will update the KIP assuming this work will finish
soon
>> I think that adding a
I think I agree with John's recent reasoning as well: instead of letting
the storeMetadataAPI to return the staleness information, letting the
client to query either active or standby and letting standby query response
to include both the values + timestamp (or lag, as in diffs of timestamps)
+1 As someone implementing a query routing layer, there is already a need
to have mechanisms in place to do healthchecks/failure detection to detect
failures for queries, while Streams rebalancing eventually kicks in the
background.
So, pushing this complexity to the IQ client app keeps Streams
Hi all,
I've been mulling about this KIP, and I think I was on the wrong track
earlier with regard to task lags. Tl;dr: I don't think we should add
lags at all to the metadata API (and also not to the AssignmentInfo
protocol message).
Like I mentioned early on, reporting lag via
Responding to the points raised by Matthias
1. IIUC John intends to add (or we can do this in this KIP) lag information
to AssignmentInfo, which gets sent to every participant.
2. At-least I was under the assumption that it can be called per query,
since the API docs don't seem to suggest
Thanks, everyone for taking a look. Some very cool ideas have flown in.
>> There was a follow-on idea I POCed to continuously share lag information in
>> the heartbeat protocol+1 that would be great, I will update the KIP assuming
>> this work will finish soon
>> I think that adding a new
One more thought:
14) Is specifying the allowed lag in number of records a useful way for
users to declare how stale an instance is allowed to be? Would it be
more intuitive for users to specify the allowed lag in time units (would
event time or processing time be better)? It seems hard for users
Some more follow up thoughts:
11) If we get a store handle of an active(restoring) task, and the task
transits to running, does the store handle become invalid and a new one
must be retrieved? Or can we "switch it out" underneath -- for this
case, how does the user know when they start to query
Thanks for the KIP.
Couple of comments:
1) With regard to KIP-441, my current understanding is that the lag
information is only reported to the leader (please correct me if I am
wrong). This seems to be quite a limitation to actually use the lag
information.
2) The idea of the metadata API is
>>I'm wondering, rather than putting "acceptable lag" into the
configuration at all, or even making it a parameter on `allMetadataForKey`,
why not just _always_ return all available metadata (including
active/standby or lag) and let the caller decide to which node they want to
route the query?
+1
Hi Navinder,
Thanks for the KIP, I have a high level question about the proposed API
regarding:
"StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)"
I'm wondering if it's more general to refactor it to
"allMetadataForKey(long tolerableDataStaleness, ...)", and when it's
Hey Navinder,
Thanks for the KIP! I've been reading over the discussion thus far,
and I have a couple of thoughts to pile on as well:
It seems confusing to propose the API in terms of the current system
state, but also propose how the API would look if/when KIP-441 is
implemented. It occurs to
Thanks, Vinoth. Looks like we are on the same page. I will add some of these
explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and
KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with
"standby".
In the new API,
Looks like we are covering ground :)
>>Only if it is within a permissible range(say 1) we will serve from
Restoring state of active.
+1 on having a knob like this.. My reasoning is as follows.
Looking at the Streams state as a read-only distributed kv store. With
num_standby = f , we should
@Vinoth, I have incorporated a few of the discussions we have had in the KIP.
In the current code, t0 and t1 serve queries from Active(Running) partition.
For case t2, we are planning to return List such that it
returns so that if IQ fails on A, the
replica on B can serve the data by
Thanks for the updates on the KIP, Navinder!
Few comments
- AssignmentInfo is not public API?. But we will change it and thus need to
increment the version and test for version_probing etc. Good to separate
that from StreamsMetadata changes (which is public API)
- From what I see, there is going
>>I get your point. But suppose there is a replica which has just become
active, so in that case replica will still be building itself from scratch
and this active will go to restoring state till it catches up with previous
active, wouldn't serving from a restoring active make more sense than a
Hi Vinoth,
Thanks for the feedback.
Can we link the JIRA, discussion thread also to the KIP.>> Added.
Based on the discussion on KAFKA-6144, I was under the impression that this KIP
is also going to cover exposing of the standby information in StreamsMetadata
and thus subsume KAFKA-8994 . That
Hi Navinder,
Thanks for sharing the KIP! Few thoughts
- Can we link the JIRA, discussion thread also to the KIP
- Based on the discussion on KAFKA-6144, I was under the impression that
this KIP is also going to cover exposing of the standby information in
StreamsMetadata and thus subsume
Hi,
Starting a discussion on the KIP to Allow state stores to serve stale reads
during
rebalance(https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance).
Thanks & Regards,Navinder
LinkedIn
57 matches
Mail list logo