Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
During the development of KIP-630 we made some minor changes to the KIP to better match the implementation details. Here is a summary of the changes we made to the KIP: 1. Added control records at the begin and end of the snapshots. The control records are versioned. The snapshot header record includes the append time of the last record from the log included in the snapshot. This is useful when determining when to delete a snapshot. 2. The configuration property metadata.snapshot.min.new_records.size was renamed to metadata.log.max.record.bytes.between.snapshots. 3. The FetchSnapshotRequest schema was changed to include the cluster id and the current leader epoch. This is used by the leader of the metadata log to validate that the request matches the current cluster id and the current leader epoch. 4. The FetchSnapshotResponse schema was changed to include the snapshot id. KIP-630: https://cwiki.apache.org/confluence/x/exV4CQ Thanks! -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
I am going to propose that we take a vote on this KIP. Thank you Jason, Ron, Jun and Guozhang for the feedback and discussion. On Fri, Oct 2, 2020 at 3:11 PM Jose Garcia Sancio wrote: > > Thank you Jun! > > Changes: > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=37=36 > > > 41. Perhaps metadata.snapshot.min.records.size can just > > be metadata.snapshot.min.records? > > Sounds good to me. Done. > > > 42. It's probably fine to change maxBytes for Fetch in a separate PR. I > > brought this up since this KIP is changing the Fetch request. > > Okay. Minor nit. KIP-630 changed the Fetch response. We will bump the > Fetch request version because it is required but the KIP doesn't make > any changes to the fields in the Fetch request. > > -Jose -- -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thank you Jun! Changes: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=37=36 > 41. Perhaps metadata.snapshot.min.records.size can just > be metadata.snapshot.min.records? Sounds good to me. Done. > 42. It's probably fine to change maxBytes for Fetch in a separate PR. I > brought this up since this KIP is changing the Fetch request. Okay. Minor nit. KIP-630 changed the Fetch response. We will bump the Fetch request version because it is required but the KIP doesn't make any changes to the fields in the Fetch request. -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi, Jose, Thanks for the updated KIP. Just a couple of minor comments. 41. Perhaps metadata.snapshot.min.records.size can just be metadata.snapshot.min.records? 42. It's probably fine to change maxBytes for Fetch in a separate PR. I brought this up since this KIP is changing the Fetch request. Thanks, Jun On Fri, Oct 2, 2020 at 12:03 PM Jose Garcia Sancio wrote: > I read through KIP-630 and made the following minor changes. > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=36=35 > > -- > -Jose >
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
I read through KIP-630 and made the following minor changes. https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=36=35 -- -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Comments below. Here are the change to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=34=33 > 41. That's a good point. With compacted topic, the cleaning won't be done > until the active segment rolls. With snapshots, I guess we don't have this > restriction? So, it would be useful to guard against too frequent > snapshotting. Does the new proposal address this completely? If the > snapshot has only 1 record, and the new record keeps updating the same key, > does that still cause the snapshot to be generated frequently? That is true. In addition to metadata.snapshot.min.cleanable.ratio we can add the following configuration: metadata.snapshot.min.records.size - This is the minimum number of bytes in the replicated log between the latest snapshot and the high-watermark needed before generating a new snapshot. The default is 20MB. Both configurations need to be satisfied before generating a new snapshot. I have updated the KIP. > 42. One of the reasons that we added the per partition limit is to allow > each partition to make relatively even progress during the catchup phase. > This helps kstreams by potentially reducing the gap between stream time > from different partitions. If we can achieve the same thing without the > partition limit, it will be fine too. "3. For the remaining partitions send > at most the average max bytes plus the average remaining bytes." Do we > guarantee that request level max_bytes can be filled up when it can? Could > we document how we distribute the request level max_bytes to partitions in > the KIP? I want to allow some flexibility in the implementation. How about the following update to the FetchSnapshot Request Handling section: 3. Send the bytes in the snapshot from Position. If there are multiple partitions in the FetchSnapshot request, then the leader will evenly distribute the number of bytes sent across all of the partitions. The leader will not send more bytes in the response than ResponseMaxBytes, the minimum of MaxBytes in the request and the value configured in replica.fetch.response.max.bytes. a. Each topic partition is guaranteed to receive at least the average of ResponseMaxBytes if that snapshot has enough bytes remaining. b. If there are topic partitions with snapshots that have remaining bytes less than the average ResponseMaxBytes, then those bytes may be used to send snapshot bytes for other topic partitions. I should also point out that in practice for KIP-630 this request will only have one topic partition (__cluster_metadata-0). I should also point out that FetchSnapshot is sending bytes not records so there is no requirement that the response must contain at least one record like Fetch. >Also, should we change Fetch accordingly? If we want to make this change I think we should do this in another KIP. What do you think? > 46. If we don't have IBP, how do we make sure that a broker doesn't > issue FetchSnapshotRequest when the receiving broker hasn't been upgraded > yet? For a broker to send a FetchSnapshotRequest it means that it received a FetchResponse that contained a SnapshotId field. For the leader to send a SnapshotId in the FetchResponse it means that the leader is executing code that knows how to handle FetchSnapshotRequests. The inverse is also true. For the follower to receive a SnapshotId for the FetchResponse it means that it sent the FetchRequest to the leader of the __cluster_metadata-0 topic partitions. Only the KafkaRaftClient will send that fetch request. After writing the above, I see what you are saying. The broker needs to know if it should enable the KafkaRaftClient and send FetchRequests to the __cluster_metadata-0 topic partition. I think that there is also a question of how to perform a rolling migration of a cluster from ZK to KIP-500. I think we will write a future KIP that documents this process. Thanks for your help here. For now, I'll mention that we will bump the IBP. The new wording for the "Compatibility, Deprecation, and Migration Plan" section: This KIP is only implemented for the internal topic __cluster_metadata. The inter-broker protocol (IBP) will be increased to indicate that all of the brokers in the cluster support KIP-595 and KIP-630. Thanks, -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for the clarification Jose, that clears my confusions already :) Guozhang On Thu, Oct 1, 2020 at 10:51 AM Jose Garcia Sancio wrote: > Thanks for the email Guozhang. > > > Thanks for the replies and the KIP updates. Just want to clarify one more > > thing regarding my previous comment 3): I understand that when a snapshot > > has completed loading, then we can use it in our handling logic of vote > > request. And I understand that: > > > > 1) Before a snapshot has been completely received (e.g. if we've only > > received a subset of the "chunks"), then we just handle vote requests "as > > like" there's no snapshot yet. > > > > 2) After a snapshot has been completely received and loaded into main > > memory, we can handle vote requests "as of" the received snapshot. > > > > What I'm wondering if, in between of these two synchronization barriers, > > after all the snapshot chunks have been received but before it has been > > completely parsed and loaded into the memory's metadata cache, if we > > received a request (note they may be handled by different threads, hence > > concurrently), what should we do? Or are you proposing that the > > fetchSnapshot request would also be handled in that single-threaded raft > > client loop so it is in order with all other requests, if that's the case > > then we do not have any concurrency issues to worry, but then on the > other > > hand the reception of the last snapshot chunk and loading them to main > > memory may also take long time during which a client may not be able to > > handle any other requests. > > Yes. The FetchSnapshot request and response handling will be performed > by the KafkaRaftClient in a single threaded fashion. The > KafkaRaftClient doesn't need to load the snapshot to know what state > it is in. It only needs to scan the "checkpoints" folder, load the > quorum state file and know the LEO of the replicated log. I would > modify 2) above to the following: > > 3) After the snapshot has been validated by > a) Fetching all of the chunks > b) Verifying the CRC of the records in the snapshot > c) Atomically moving the temporary snapshot to the permanent location > > After 3.c), the KafkaRaftClient only needs to scan and parse the > filenames in the directory called "checkpoints" to find the > largest/latest permanent snapshot. > > As you point out in 1) before 3.c) the KafkaRaftClient, in regards to > leader election, will behave as if the temporary snapshot didn't > exists. > > The loading of the snapshot will be done by the state machine (Kafka > Controller or Metadata Cache) and it can perform this on a different > thread. The KafkaRaftClient will provide an API for finding and > reading the latest valid snapshot stored locally. > > Are you also concerned that the snapshot could have been corrupted after > 3.c? > > I also updated the "Changes to leader Election" section to make this a > bit clearer. > > Thanks, > Jose > -- -- Guozhang
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi, Jose, Thanks for the reply. A few more comments. 41. That's a good point. With compacted topic, the cleaning won't be done until the active segment rolls. With snapshots, I guess we don't have this restriction? So, it would be useful to guard against too frequent snapshotting. Does the new proposal address this completely? If the snapshot has only 1 record, and the new record keeps updating the same key, does that still cause the snapshot to be generated frequently? 42. One of the reasons that we added the per partition limit is to allow each partition to make relatively even progress during the catchup phase. This helps kstreams by potentially reducing the gap between stream time from different partitions. If we can achieve the same thing without the partition limit, it will be fine too. "3. For the remaining partitions send at most the average max bytes plus the average remaining bytes." Do we guarantee that request level max_bytes can be filled up when it can? Could we document how we distribute the request level max_bytes to partitions in the KIP? Also, should we change Fetch accordingly? 46. If we don't have IBP, how do we make sure that a broker doesn't issue FetchSnapshotRequest when the receiving broker hasn't been upgraded yet? Thanks, Jun On Thu, Oct 1, 2020 at 10:09 AM Jose Garcia Sancio wrote: > Thank you for the quick response Jun. Excuse the delayed response but > I wanted to confirm some things regarding IBP. See comments below. > > Here are my changes to the KIP: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=30=28 > > > 40. LBO: Code wise, logStartOffset is used in so many places like Log, > > ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am > not > > if it's worth renaming in all those places. If the main concern is to > > avoid confusion, we can just always spell out logStartOffset. > > Done. Keeping it as LogStartOffset is better. I was also concerned > with external tools that may be generating code from the JSON schema. > > > 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be > > empty initially, it's probably better to define the ratio as > new_data_size > > / (new_data_size + snapshot_size). This avoids the dividing by zero issue > > and is also consistent with the cleaner ratio definition for compacted > > topics. > > I am assuming that snapshot_size is the size of the largest snapshot > on disk, is this correct? If we use this formal then we will generate > snapshots very quickly if the snapshot on disk is zero or very small. > > In general what we care about is if the replicated log has a lot of > records that delete or update records in the snapshot. I was thinking > something along the following formula: > > (size of delete snapshot records + size of updated records) / (total > size of snapshot), where total size of snapshot is greater than zero. > 0, where total size of snapshot is zero > > This means that in the extreme case where the replicated log only > contains "addition" records then we never generate a snapshot. I think > this is the desired behavior since generating a snapshot will consume > disk bandwidth without saving disk space. What do you think? > > > > > 42. FetchSnapshotRequest: Since this is designed to fetch more than one > > partition, it seems it's useful to have a per-partition maxBytes, in > > addition to the request level maxBytes, just like a Fetch request? > > Yeah, we have debated this in another thread from Jason. The argument > is that MaxBytes at the top level is all that we need if we implement > the following heuristic: > > 1. Compute the average max bytes per partition by dividing the max by > the number of partitions in the request. > 2. For all of the partitions with remaining bytes less than this > average max bytes, then send all of those bytes and sum the remaining > bytes. > 3. For the remaining partitions send at most the average max bytes > plus the average remaining bytes. > > Note that this heuristic will only be performed once and not at worst > N times for N partitions. > > What do you think? Besides consistency with Fetch requests, is there > another reason to have MaxBytes per partition? > > > 43. FetchSnapshotResponse: > > 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is > > that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE > seems > > to suggest that the provided EndOffset is wrong, which is not the > intention > > for the error code. > > Yeah. Added a new error called POSITION_OUT_OF_RANGE. > > > 43.1 Position field seems to be the same as the one in > > FetchSnapshotRequest. If we have both, should the requester verify the > > consistency between two values and what should the requester do if the > two > > values don't match? > > Yeah the Position in the response will be the same value as the > Position in the request. I was thinking of only verifying Position > against the state on the temporary
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for the email Guozhang. > Thanks for the replies and the KIP updates. Just want to clarify one more > thing regarding my previous comment 3): I understand that when a snapshot > has completed loading, then we can use it in our handling logic of vote > request. And I understand that: > > 1) Before a snapshot has been completely received (e.g. if we've only > received a subset of the "chunks"), then we just handle vote requests "as > like" there's no snapshot yet. > > 2) After a snapshot has been completely received and loaded into main > memory, we can handle vote requests "as of" the received snapshot. > > What I'm wondering if, in between of these two synchronization barriers, > after all the snapshot chunks have been received but before it has been > completely parsed and loaded into the memory's metadata cache, if we > received a request (note they may be handled by different threads, hence > concurrently), what should we do? Or are you proposing that the > fetchSnapshot request would also be handled in that single-threaded raft > client loop so it is in order with all other requests, if that's the case > then we do not have any concurrency issues to worry, but then on the other > hand the reception of the last snapshot chunk and loading them to main > memory may also take long time during which a client may not be able to > handle any other requests. Yes. The FetchSnapshot request and response handling will be performed by the KafkaRaftClient in a single threaded fashion. The KafkaRaftClient doesn't need to load the snapshot to know what state it is in. It only needs to scan the "checkpoints" folder, load the quorum state file and know the LEO of the replicated log. I would modify 2) above to the following: 3) After the snapshot has been validated by a) Fetching all of the chunks b) Verifying the CRC of the records in the snapshot c) Atomically moving the temporary snapshot to the permanent location After 3.c), the KafkaRaftClient only needs to scan and parse the filenames in the directory called "checkpoints" to find the largest/latest permanent snapshot. As you point out in 1) before 3.c) the KafkaRaftClient, in regards to leader election, will behave as if the temporary snapshot didn't exists. The loading of the snapshot will be done by the state machine (Kafka Controller or Metadata Cache) and it can perform this on a different thread. The KafkaRaftClient will provide an API for finding and reading the latest valid snapshot stored locally. Are you also concerned that the snapshot could have been corrupted after 3.c? I also updated the "Changes to leader Election" section to make this a bit clearer. Thanks, Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thank you for the quick response Jun. Excuse the delayed response but I wanted to confirm some things regarding IBP. See comments below. Here are my changes to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=30=28 > 40. LBO: Code wise, logStartOffset is used in so many places like Log, > ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not > if it's worth renaming in all those places. If the main concern is to > avoid confusion, we can just always spell out logStartOffset. Done. Keeping it as LogStartOffset is better. I was also concerned with external tools that may be generating code from the JSON schema. > 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be > empty initially, it's probably better to define the ratio as new_data_size > / (new_data_size + snapshot_size). This avoids the dividing by zero issue > and is also consistent with the cleaner ratio definition for compacted > topics. I am assuming that snapshot_size is the size of the largest snapshot on disk, is this correct? If we use this formal then we will generate snapshots very quickly if the snapshot on disk is zero or very small. In general what we care about is if the replicated log has a lot of records that delete or update records in the snapshot. I was thinking something along the following formula: (size of delete snapshot records + size of updated records) / (total size of snapshot), where total size of snapshot is greater than zero. 0, where total size of snapshot is zero This means that in the extreme case where the replicated log only contains "addition" records then we never generate a snapshot. I think this is the desired behavior since generating a snapshot will consume disk bandwidth without saving disk space. What do you think? > > 42. FetchSnapshotRequest: Since this is designed to fetch more than one > partition, it seems it's useful to have a per-partition maxBytes, in > addition to the request level maxBytes, just like a Fetch request? Yeah, we have debated this in another thread from Jason. The argument is that MaxBytes at the top level is all that we need if we implement the following heuristic: 1. Compute the average max bytes per partition by dividing the max by the number of partitions in the request. 2. For all of the partitions with remaining bytes less than this average max bytes, then send all of those bytes and sum the remaining bytes. 3. For the remaining partitions send at most the average max bytes plus the average remaining bytes. Note that this heuristic will only be performed once and not at worst N times for N partitions. What do you think? Besides consistency with Fetch requests, is there another reason to have MaxBytes per partition? > 43. FetchSnapshotResponse: > 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is > that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems > to suggest that the provided EndOffset is wrong, which is not the intention > for the error code. Yeah. Added a new error called POSITION_OUT_OF_RANGE. > 43.1 Position field seems to be the same as the one in > FetchSnapshotRequest. If we have both, should the requester verify the > consistency between two values and what should the requester do if the two > values don't match? Yeah the Position in the response will be the same value as the Position in the request. I was thinking of only verifying Position against the state on the temporary snapshot file on disk. If Position is not equal to the size of the file then reject the response and send another FetchSnapshot request. > 44. metric: Would a metric that captures the lag in offset between the last > snapshot and the logEndOffset be useful? Yes. How about the difference between the last snapshot offset and the high-watermark? Snapshot can only be created up to the high-watermark. Added this metric. Let me know if you still think we need a metric for the difference between the largest snapshot end offset and the high-watermark. > 45. It seems the KIP assumes that every voter (leader and follower) and > observer has a local replicated log for __cluster_metadata. It would be > useful to make that clear in the overview section. Updated the overview section. I think that this decision affects the section "Changes to Leader Election". That section should not affect observers since they don't participate in leader elections. It also affects the section "Validation of Snapshot and Log" but it should be possible to fix that section if observers don't have the replicated log on disk. > 46. Does this KIP cover upgrading from older versions of Kafka? If so, do > we need IBP to guard the usage of modified FetchRequest and the new > FetchSnapshotRequest? If not, could we make it clear that upgrading will be > covered somewhere else? In short, I don't think we need to increase the IBP. When we implement snapshots for other topics like __consumer_offset and
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hello Jose, Thanks for the replies and the KIP updates. Just want to clarify one more thing regarding my previous comment 3): I understand that when a snapshot has completed loading, then we can use it in our handling logic of vote request. And I understand that: 1) Before a snapshot has been completely received (e.g. if we've only received a subset of the "chunks"), then we just handle vote requests "as like" there's no snapshot yet. 2) After a snapshot has been completely received and loaded into main memory, we can handle vote requests "as of" the received snapshot. What I'm wondering if, in between of these two synchronization barriers, after all the snapshot chunks have been received but before it has been completely parsed and loaded into the memory's metadata cache, if we received a request (note they may be handled by different threads, hence concurrently), what should we do? Or are you proposing that the fetchSnapshot request would also be handled in that single-threaded raft client loop so it is in order with all other requests, if that's the case then we do not have any concurrency issues to worry, but then on the other hand the reception of the last snapshot chunk and loading them to main memory may also take long time during which a client may not be able to handle any other requests. Guozhang On Wed, Sep 30, 2020 at 10:57 AM Jun Rao wrote: > Hi, Jose, > > Thanks for the updated KIP. A few more comments below. > > 40. LBO: Code wise, logStartOffset is used in so many places like Log, > ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not > if it's worth renaming in all those places. If the main concern is to > avoid confusion, we can just always spell out logStartOffset. > > 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be > empty initially, it's probably better to define the ratio as new_data_size > / (new_data_size + snapshot_size). This avoids the dividing by zero issue > and is also consistent with the cleaner ratio definition for compacted > topics. > > 42. FetchSnapshotRequest: Since this is designed to fetch more than one > partition, it seems it's useful to have a per-partition maxBytes, in > addition to the request level maxBytes, just like a Fetch request? > > 43. FetchSnapshotResponse: > 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is > that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems > to suggest that the provided EndOffset is wrong, which is not the intention > for the error code. > 43.1 Position field seems to be the same as the one in > FetchSnapshotRequest. If we have both, should the requester verify the > consistency between two values and what should the requester do if the two > values don't match? > > 44. metric: Would a metric that captures the lag in offset between the last > snapshot and the logEndOffset be useful? > > 45. It seems the KIP assumes that every voter (leader and follower) and > observer has a local replicated log for __cluster_metadata. It would be > useful to make that clear in the overview section. > > 46. Does this KIP cover upgrading from older versions of Kafka? If so, do > we need IBP to guard the usage of modified FetchRequest and the new > FetchSnapshotRequest? If not, could we make it clear that upgrading will be > covered somewhere else? > > Thanks, > > Jun > > On Mon, Sep 28, 2020 at 9:25 PM Jose Garcia Sancio > wrote: > > > Hi Guozhang, > > > > Thanks for your feedback. It was very helpful. See my comments below. > > > > Changes to the KIP: > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27 > > > > On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang > wrote: > > > > > > Hello Jose, > > > > > > Thanks for the KIP. Overall it looks great. I have a few meta / minor > > > question, or maybe just clarifications below: > > > > > > Meta: > > > > > > 1. I want to clarify that if only the active controller would generate > > > snapshots, OR would any voter in the quorum would generate snapshots, > OR > > > would even observers generate snapshots? Originally I thought it was > the > > > latter case, but I think reading through the doc I got confused by some > > > paragraphs. E.g. you mentioned snapshots are generated by the > Controller > > > module, and observers would not have that module. > > > > Sorry for the confusion and inconsistency here. Every replica of the > > cluster metadata topic partition will generate a snapshot. That > > includes the voters (leader and followers) and observers. In this KIP > > the leader is the Active Controller, the voters are the Kafka > > Controllers and the observers are the Metadata Cache. > > > > I went through the KIP again and made sure to enumerate both Kafka > > Controllers and Metadata Cache when talking about snapshot generation > > and loading. > > > > I renamed the new configurations to be prefixed by metadata instead of > > controller. > > > > I moved the terminology
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi, Jose, Thanks for the updated KIP. A few more comments below. 40. LBO: Code wise, logStartOffset is used in so many places like Log, ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not if it's worth renaming in all those places. If the main concern is to avoid confusion, we can just always spell out logStartOffset. 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be empty initially, it's probably better to define the ratio as new_data_size / (new_data_size + snapshot_size). This avoids the dividing by zero issue and is also consistent with the cleaner ratio definition for compacted topics. 42. FetchSnapshotRequest: Since this is designed to fetch more than one partition, it seems it's useful to have a per-partition maxBytes, in addition to the request level maxBytes, just like a Fetch request? 43. FetchSnapshotResponse: 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems to suggest that the provided EndOffset is wrong, which is not the intention for the error code. 43.1 Position field seems to be the same as the one in FetchSnapshotRequest. If we have both, should the requester verify the consistency between two values and what should the requester do if the two values don't match? 44. metric: Would a metric that captures the lag in offset between the last snapshot and the logEndOffset be useful? 45. It seems the KIP assumes that every voter (leader and follower) and observer has a local replicated log for __cluster_metadata. It would be useful to make that clear in the overview section. 46. Does this KIP cover upgrading from older versions of Kafka? If so, do we need IBP to guard the usage of modified FetchRequest and the new FetchSnapshotRequest? If not, could we make it clear that upgrading will be covered somewhere else? Thanks, Jun On Mon, Sep 28, 2020 at 9:25 PM Jose Garcia Sancio wrote: > Hi Guozhang, > > Thanks for your feedback. It was very helpful. See my comments below. > > Changes to the KIP: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27 > > On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang wrote: > > > > Hello Jose, > > > > Thanks for the KIP. Overall it looks great. I have a few meta / minor > > question, or maybe just clarifications below: > > > > Meta: > > > > 1. I want to clarify that if only the active controller would generate > > snapshots, OR would any voter in the quorum would generate snapshots, OR > > would even observers generate snapshots? Originally I thought it was the > > latter case, but I think reading through the doc I got confused by some > > paragraphs. E.g. you mentioned snapshots are generated by the Controller > > module, and observers would not have that module. > > Sorry for the confusion and inconsistency here. Every replica of the > cluster metadata topic partition will generate a snapshot. That > includes the voters (leader and followers) and observers. In this KIP > the leader is the Active Controller, the voters are the Kafka > Controllers and the observers are the Metadata Cache. > > I went through the KIP again and made sure to enumerate both Kafka > Controllers and Metadata Cache when talking about snapshot generation > and loading. > > I renamed the new configurations to be prefixed by metadata instead of > controller. > > I moved the terminology section to the top. > > > > > 2. Following on Jun's previous comment: currently the __consumer_metadata > > log is replicated on ALL brokers since all voters and observers would > > replicate that topic. I know this may be out of the scope of this KIP > but I > > think maybe only letting the voters to replicate (and periodically > > truncate) the log while observers only maintain the in-memory state and > > snapshots is a good trade-off here, assuming snapshot loading is > relatively > > fast. > > This is a good idea and optimization. It would save a write. I think > we need to think about the implication to KIP-642, the dynamic quorum > reassignment KIP, if we end up allowing observers to get "promoted" to > voters. > > > > > 3. When a raft client is in the middle of loading a snapshot, should it > > reject any vote / begin-/end-/describe-quorum requests at the time? More > > generally, while a snapshot is being loaded, how should we treat the > > current state of the client when handling Raft requests. > > Re: requesting votes and granting votes. > > In the section "Changes to Leader Election", I think this section was > improved since your review. I mentioned that the raft client needs to > look at: > > 1. latest/largest snapshot epoch and end offset > 2. the LEO of the replicated log > > The voters should use the latest/largest of these two during the > election process. > > Re: quorum state > > For KIP-595 and KIP-630 the snapshot doesn't include any quorum > information. This may change in KIP-642. > > > > > Minor: > > > >
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi Guozhang, Thanks for your feedback. It was very helpful. See my comments below. Changes to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27 On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang wrote: > > Hello Jose, > > Thanks for the KIP. Overall it looks great. I have a few meta / minor > question, or maybe just clarifications below: > > Meta: > > 1. I want to clarify that if only the active controller would generate > snapshots, OR would any voter in the quorum would generate snapshots, OR > would even observers generate snapshots? Originally I thought it was the > latter case, but I think reading through the doc I got confused by some > paragraphs. E.g. you mentioned snapshots are generated by the Controller > module, and observers would not have that module. Sorry for the confusion and inconsistency here. Every replica of the cluster metadata topic partition will generate a snapshot. That includes the voters (leader and followers) and observers. In this KIP the leader is the Active Controller, the voters are the Kafka Controllers and the observers are the Metadata Cache. I went through the KIP again and made sure to enumerate both Kafka Controllers and Metadata Cache when talking about snapshot generation and loading. I renamed the new configurations to be prefixed by metadata instead of controller. I moved the terminology section to the top. > > 2. Following on Jun's previous comment: currently the __consumer_metadata > log is replicated on ALL brokers since all voters and observers would > replicate that topic. I know this may be out of the scope of this KIP but I > think maybe only letting the voters to replicate (and periodically > truncate) the log while observers only maintain the in-memory state and > snapshots is a good trade-off here, assuming snapshot loading is relatively > fast. This is a good idea and optimization. It would save a write. I think we need to think about the implication to KIP-642, the dynamic quorum reassignment KIP, if we end up allowing observers to get "promoted" to voters. > > 3. When a raft client is in the middle of loading a snapshot, should it > reject any vote / begin-/end-/describe-quorum requests at the time? More > generally, while a snapshot is being loaded, how should we treat the > current state of the client when handling Raft requests. Re: requesting votes and granting votes. In the section "Changes to Leader Election", I think this section was improved since your review. I mentioned that the raft client needs to look at: 1. latest/largest snapshot epoch and end offset 2. the LEO of the replicated log The voters should use the latest/largest of these two during the election process. Re: quorum state For KIP-595 and KIP-630 the snapshot doesn't include any quorum information. This may change in KIP-642. > > Minor: > > 4."All of the live replicas (followers and observers) have replicated LBO". > Today the raft layer does not yet maintain LBO across all replicas, is this > information kept in the controller layer? I'm asking because I do not see > relevant docs in KIP-631 and hence a bit confused which layer is > responsible for bookkeeping the LBOs of all replicas. This is not minor! :). This should be done in the raft client as part of the fetch protocol. Note that LBO is just a rename of log start offset. If the current raft implementation doesn't manage this information then we will have to implement this as part of implementing this KIP (KIP-630). > 5. "Followers and observers will increase their log begin offset to the > value sent on the fetch response as long as the local Kafka Controller and > Metadata Cache has generated a snapshot with an end offset greater than or > equal to the new log begin offset." Not sure I follow this: 1) If observers > do not generate snapshots since they do not have a Controller module on > them, then it is possible that observers do not have any snapshots at all > if they do not get one from the leader, in that case they would never > truncate the logs locally; Observers will have a Metadata Cache which will be responsible for generating snapshots. > 2) could you clarify on "value sent on the fetch > response", are you referring to the "HighWatermark", or "LogStartOffset" in > the schema, or some other fields? The log begin offset is the same as the log start offset. This KIP renames that field in the fetch response. I am starting to think that renaming this field in this KIP is not worth it. What do you think? > > 6. The term "SnapshotId" is introduced without definition at first. My > understanding is it's defined as a combo of , could you > clarify if this is the case? Good point. I added this sentence to the Snapshot Format section and terminology section: "Each snapshot is uniquely identified by a SnapshotId, the epoch and end offset of the records in the replicated log included in the snapshot." > BTW I think the term "endOffset" is a term >
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for the reply Jun. Some comments below. Here are the changes: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=27=26 > 20. Good point on metadata cache. I think we need to make a decision > consistently. For example, if we decide that dedicated voter nodes don't > serve metadata requests, then we don't need to expose the voters host/port > to the client. Which KIP should make this decision? Makes sense. My opinion is that this should be addressed in KIP-631 since I think exposing this information is independent of snapshotting. Note that I think there is a long term goal to make the __cluster_metadata topic partition readable by a Kafka Consumer but we can address that in a future KIP. > 31. controller.snapshot.minimum.records: For a compacted topic, we use a > ratio instead of the number of records to determine when to compact. This > has some advantages. For example, if we use > controller.snapshot.minimum.records and set it to 1000, then it will > trigger the generation of a new snapshot when the existing snapshot is > either 10MB or 1GB. Intuitively, the larger the snapshot, the more > expensive it is to write to disk. So, we want to wait for more data to be > accumulated before generating the next snapshot. The ratio based setting > achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be > accumulated to regenerate a 10MB/1GB snapshot respectively. I agree. I proposed using a simple algorithm like "controller.snapshot.minimum.records" since calculating a dirty ratio may not be straightforward when replicated log records don't map 1:1 to snapshot records. But I think we can implement a heuristic for this. There is a small complication when generating the first snapshot but it should be implementable. Here is the latest wording of the "When to Snapshot" section: If the Kafka Controller generates a snapshot too frequently then it can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller will have a new configuration option controller.snapshot.min.cleanable.ratio. If the number of snapshot records that have changed (deleted or modified) between the latest snapshot and the current in-memory state divided by the total number of snapshot records is greater than controller.snapshot.min.cleanable.ratio, then the Kafka Controller will perform a new snapshot. Note that new snapshot records don't count against this ratio. If a new snapshot record was added since that last snapshot then it doesn't affect the dirty ratio. If a snapshot record was added and then modified or deleted then it counts against the dirty ratio. > 32. max.replication.lag.ms: It seems this is specific to the metadata > topic. Could we make that clear in the name? Good catch. Done.
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks Jason. Some comments below. > > Generally the number of snapshots on disk will be one. I suspect that > users will want some control over this. We can add a configuration > option that doesn't delete, or advances the log begin offset past, the > N latest snapshots. We can set the default value for this > configuration to two. What do you think? > > I know Zookeeper has a config like this, but I'm not sure how frequently it > is used. I would probably suggest we pick a good number of snapshots (maybe > just 1-2) and leave it out of the configs. > Sounds good to me. If followers/observers are keeping up with the Leader, I think the description in section "When to Increase the Log Begin Offset" will lead to one snapshot on disk in the steady state. > > We could use the same configuration we have for Fetch but to avoid > confusion let's add two more configurations for > "replica.fetch.snapshot.max.bytes" and > "replica.fetch.snapshot.response.max.bytes". > > My vote would probably be to reuse the existing configs. We can add new > configs in the future if the need emerges, but I can't think of a good > reason why a user would want these to be different. Sounds good to me. Removed the configuration from the KIP. Updated the FetchSnapshot request handling section to mention that the replica.fetch.response.max.bytes configuration will be used. > By the way, it looks like the FetchSnapshot schema now has both a partition > level and a top level max bytes. Do we need both? Kepted the top level MaxBytes and remove the topic partition level MaxBytes. > > The snapshot epoch will be used when ordering snapshots and more > importantly when setting the LastFetchedEpoch in the Fetch request. It > is possible for a follower to have a snapshot and an empty log. In > this case the follower will use the epoch of the snapshot when setting > the LastFetchEpoch in the Fetch request. > > Just to be clear, I think it is important to include the snapshot epoch so > that we /can/ reason about the snapshot state in the presence of data loss. > However, if we excluded data loss, then this would strictly speaking be > unnecessary because a snapshot offset would always be uniquely defined > (since we do not snapshot above the high watermark). Hence it would be safe > to leave LastFetchedEpoch undefined. Anyway, I think we're on the same page > about the behavior, just thought it might be useful to clarify the > reasoning. Okay. Even though you are correct that the LastFetchEpoch shouldn't matter since the follower is fetching committed data. I still think that the follower should send the epoch of the snapshot for the LastFetchedEpoch for extra validation on the leader. What do you think?
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hello Jose, Thanks for the KIP. Overall it looks great. I have a few meta / minor question, or maybe just clarifications below: Meta: 1. I want to clarify that if only the active controller would generate snapshots, OR would any voter in the quorum would generate snapshots, OR would even observers generate snapshots? Originally I thought it was the latter case, but I think reading through the doc I got confused by some paragraphs. E.g. you mentioned snapshots are generated by the Controller module, and observers would not have that module. 2. Following on Jun's previous comment: currently the __consumer_metadata log is replicated on ALL brokers since all voters and observers would replicate that topic. I know this may be out of the scope of this KIP but I think maybe only letting the voters to replicate (and periodically truncate) the log while observers only maintain the in-memory state and snapshots is a good trade-off here, assuming snapshot loading is relatively fast. 3. When a raft client is in the middle of loading a snapshot, should it reject any vote / begin-/end-/describe-quorum requests at the time? More generally, while a snapshot is being loaded, how should we treat the current state of the client when handling Raft requests. Minor: 4."All of the live replicas (followers and observers) have replicated LBO". Today the raft layer does not yet maintain LBO across all replicas, is this information kept in the controller layer? I'm asking because I do not see relevant docs in KIP-631 and hence a bit confused which layer is responsible for bookkeeping the LBOs of all replicas. 5. "Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log begin offset." Not sure I follow this: 1) If observers do not generate snapshots since they do not have a Controller module on them, then it is possible that observers do not have any snapshots at all if they do not get one from the leader, in that case they would never truncate the logs locally; 2) could you clarify on "value sent on the fetch response", are you referring to the "HighWatermark", or "LogStartOffset" in the schema, or some other fields? 6. The term "SnapshotId" is introduced without definition at first. My understanding is it's defined as a combo of , could you clarify if this is the case? BTW I think the term "endOffset" is a term used per log, and maybe calling the part of the SnapshotId "nextOffset" is better since that offset is likely already filled with a record. 7. This is a very nit one: "If the latest snapshot has an epoch E and end offset O and is it newer than the LEO of the replicated log, then the replica must set the LBO and LEO to O." On wiki `O` and `0` looks very much the same and that confused me a couple of times... I'd suggest we phrase any of such occussions to "an epoch e1 and offset o1". Also for LEO since we would not really know what would be its epoch (since it may be bumped) when comparing we only care about the offset and not about the epoch right? If yes, please clarify that in the doc as well. 8. "LEO - log end offset - the largest offset and epoch that has been written to disk." I think LEO is the "next" offset to be written to the log right? Also it seems consistent with your diagrams. 9. "... will send a vote request and response as if they had an empty log." Not sure I completely follow this, do you mean that they will set "LastOffsetEpoch/LastOffset" as "-1/0" when sending a vote request, and upon receiving a vote request it would compare the request's "LastOffsetEpoch/LastOffset" with "-1/0" as well? 10. In the FetchSnapshot response schema, just to clarify the "Position" : "The byte position within the snapshot." is referring to the starting byte position of the returned snapshot data, right? Thanks, Guozhang On Fri, Sep 25, 2020 at 4:42 PM Jun Rao wrote: > Hi, Jose, > > Thanks for the reply. A few more comments below. > > 20. Good point on metadata cache. I think we need to make a decision > consistently. For example, if we decide that dedicated voter nodes don't > serve metadata requests, then we don't need to expose the voters host/port > to the client. Which KIP should make this decision? > > 31. controller.snapshot.minimum.records: For a compacted topic, we use a > ratio instead of the number of records to determine when to compact. This > has some advantages. For example, if we use > controller.snapshot.minimum.records and set it to 1000, then it will > trigger the generation of a new snapshot when the existing snapshot is > either 10MB or 1GB. Intuitively, the larger the snapshot, the more > expensive it is to write to disk. So, we want to wait for more data to be > accumulated before generating the next snapshot. The ratio based setting > achieves this. For instance, a 50% ratio requires 10MB/1GB more data to
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi, Jose, Thanks for the reply. A few more comments below. 20. Good point on metadata cache. I think we need to make a decision consistently. For example, if we decide that dedicated voter nodes don't serve metadata requests, then we don't need to expose the voters host/port to the client. Which KIP should make this decision? 31. controller.snapshot.minimum.records: For a compacted topic, we use a ratio instead of the number of records to determine when to compact. This has some advantages. For example, if we use controller.snapshot.minimum.records and set it to 1000, then it will trigger the generation of a new snapshot when the existing snapshot is either 10MB or 1GB. Intuitively, the larger the snapshot, the more expensive it is to write to disk. So, we want to wait for more data to be accumulated before generating the next snapshot. The ratio based setting achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be accumulated to regenerate a 10MB/1GB snapshot respectively. 32. max.replication.lag.ms: It seems this is specific to the metadata topic. Could we make that clear in the name? Thanks, Jun On Fri, Sep 25, 2020 at 12:43 PM Jose Garcia Sancio wrote: > Thanks for the detailed feedback Jun. > > The changes are here: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=25=24 > > Here is a summary of the change to the KIP: > 1. Use end offset for snapshot and snapshot id. > 2. Include default for all of the new configuration options. > 3. Provide more detail in the response handling for FetchSnapshot > > > 20. "Metadata Cache: The component that generates snapshots, reads > > snapshots and reads logs for observer replicas of the topic partition > > __cluster_metadata." It seems this is needed on every broker, not just > > observers? > > Yes. I think we need some clarification and consensus here. Some > people are advocating for Kafka brokers to only be observers and would > only contain a Metadata Cache. With the Kafka Controllers being > separate nodes that are voters (follower, candidate or leader) and not > observers. Others are advocating for Kafka Brokers to be able to host > both the Kafka Controller and the Metadata Cache. In this case if the > Controller and Metadata Cache are sharing the same underlying topic > partition then we need to make sure that we unify the snapshotting > logic. > > I would like to be able to unify the in-memory state for both the > Kafka Controller and the Metadata Cache so that we can share the same > replicated log and snapshot. > > > 21. Our current convention is to use exclusive offset for naming > > checkpoint files. For example, a producer snapshot file of 1234.snapshot > > means that the file includes the producer state up to, but not including > > offset 1234. So, we probably want to follow the same convention for the > new > > checkpoint file. > > Thanks for pointing this out. This sounds good to me. This was a > detail that I was struggling with when reading the replication code. > Updated the KIP. Wherever the offset is exclusive, I renamed it to > "end offset" (EndOffset). > > > 22. Snapshot Format: KIP-631 only defines the format for individual > > records. It seems that we need to define the container format here. For > > example, we need to store the length of each record. Also, does the > > snapshot file need a CRC field? > > Yes. I have added more information on this. In summary, we are going > to use Kafka's log format version 2. This will give us support for > compression and CRC at the record batch level. The Kafka Controller > and Metadata Cache can control how big they want the batches to be. > > > 23. Could we provide the default value for the new > > configs controller.snapshot.minimum.records and max.replication.lag.ms. > > Also, max.replication.lag.ms seems to just control the snapshot > frequency > > by time and not directly relate to replication. So, maybe it should be > > called sth like controller.snapshot.minimum.interval.ms? > > "max.replication.lag.ms" is very similar to "replica.lag.time.max.ms". > Kafka uses "replica.lag.time.max.ms" to make progress on the > high-watermark when replicas are slow or offline. We want to use > "max.replication.lag.ms" to make progress on the LBO when replicas are > slow or offline. These very similar names are confusing. How about > "replica.lbo.lag.time.max.ms"? > > How often snapshotting will happen is determined by > "controller.snapshot.minimum.records". > > > 24. "Kafka allows the clients to delete records that are less than a > given > > offset by using the DeleteRecords RPC . Those requests will be validated > > using the same logic enumerated above." Hmm, should we allow deleteRecord > > on the metadata topic? If we do, does it trim the snapshot accordingly? > > Yeah. After thinking about it some more, I don't think we shouldn't > allow DeleteRecords to succeed on the __cluster_metadata partition. > For the error that we return it
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for the detailed feedback Jun. The changes are here: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=25=24 Here is a summary of the change to the KIP: 1. Use end offset for snapshot and snapshot id. 2. Include default for all of the new configuration options. 3. Provide more detail in the response handling for FetchSnapshot > 20. "Metadata Cache: The component that generates snapshots, reads > snapshots and reads logs for observer replicas of the topic partition > __cluster_metadata." It seems this is needed on every broker, not just > observers? Yes. I think we need some clarification and consensus here. Some people are advocating for Kafka brokers to only be observers and would only contain a Metadata Cache. With the Kafka Controllers being separate nodes that are voters (follower, candidate or leader) and not observers. Others are advocating for Kafka Brokers to be able to host both the Kafka Controller and the Metadata Cache. In this case if the Controller and Metadata Cache are sharing the same underlying topic partition then we need to make sure that we unify the snapshotting logic. I would like to be able to unify the in-memory state for both the Kafka Controller and the Metadata Cache so that we can share the same replicated log and snapshot. > 21. Our current convention is to use exclusive offset for naming > checkpoint files. For example, a producer snapshot file of 1234.snapshot > means that the file includes the producer state up to, but not including > offset 1234. So, we probably want to follow the same convention for the new > checkpoint file. Thanks for pointing this out. This sounds good to me. This was a detail that I was struggling with when reading the replication code. Updated the KIP. Wherever the offset is exclusive, I renamed it to "end offset" (EndOffset). > 22. Snapshot Format: KIP-631 only defines the format for individual > records. It seems that we need to define the container format here. For > example, we need to store the length of each record. Also, does the > snapshot file need a CRC field? Yes. I have added more information on this. In summary, we are going to use Kafka's log format version 2. This will give us support for compression and CRC at the record batch level. The Kafka Controller and Metadata Cache can control how big they want the batches to be. > 23. Could we provide the default value for the new > configs controller.snapshot.minimum.records and max.replication.lag.ms. > Also, max.replication.lag.ms seems to just control the snapshot frequency > by time and not directly relate to replication. So, maybe it should be > called sth like controller.snapshot.minimum.interval.ms? "max.replication.lag.ms" is very similar to "replica.lag.time.max.ms". Kafka uses "replica.lag.time.max.ms" to make progress on the high-watermark when replicas are slow or offline. We want to use "max.replication.lag.ms" to make progress on the LBO when replicas are slow or offline. These very similar names are confusing. How about "replica.lbo.lag.time.max.ms"? How often snapshotting will happen is determined by "controller.snapshot.minimum.records". > 24. "Kafka allows the clients to delete records that are less than a given > offset by using the DeleteRecords RPC . Those requests will be validated > using the same logic enumerated above." Hmm, should we allow deleteRecord > on the metadata topic? If we do, does it trim the snapshot accordingly? Yeah. After thinking about it some more, I don't think we shouldn't allow DeleteRecords to succeed on the __cluster_metadata partition. For the error that we return it looks like our options are the existing "POLICY_VIOLATIOIN" (the description for this error is "Request parameters do not satisfy the configured policy.') or introduce a new error. I think we should just return POLICY_VIOLATIOIN, what do you think? > 25. "The followers of the __cluster_metadata topic partition will > concurrently fetch the snapshot and replicated log. This means that > candidates with incomplete snapshots will send a vote request with a > LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the > replicated log." My understanding is that a follower will either fetch from > the snapshot or the log, but not both at the same time. Could you explain > how the concurrent part works? Also, what's an incomplete snapshot? Yes. I rewrote this section based on your comment and Jason's comments. Let me know if this addresses your concerns. https://cwiki.apache.org/confluence/display/KAFKA/KIP-630:+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ChangestoLeaderElection > > 26. FetchRequest: > 26.1 Handling Fetch Request: I agree with Jason that SnapshotOffsetAndEpoch > already tells us the next offset to fetch. So, we don't need to > set NextOffsetAndEpoch in the response. Agreed. The response will set one or the other. If SnapshotId (field renamed in the latest version of the KIP) is set then the
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks Jose. Makes sense overall. A few specific responses below: > Generally the number of snapshots on disk will be one. I suspect that users will want some control over this. We can add a configuration option that doesn't delete, or advances the log begin offset past, the N latest snapshots. We can set the default value for this configuration to two. What do you think? I know Zookeeper has a config like this, but I'm not sure how frequently it is used. I would probably suggest we pick a good number of snapshots (maybe just 1-2) and leave it out of the configs. > We could use the same configuration we have for Fetch but to avoid confusion let's add two more configurations for "replica.fetch.snapshot.max.bytes" and "replica.fetch.snapshot.response.max.bytes". My vote would probably be to reuse the existing configs. We can add new configs in the future if the need emerges, but I can't think of a good reason why a user would want these to be different. By the way, it looks like the FetchSnapshot schema now has both a partition level and a top level max bytes. Do we need both? > The snapshot epoch will be used when ordering snapshots and more importantly when setting the LastFetchedEpoch in the Fetch request. It is possible for a follower to have a snapshot and an empty log. In this case the follower will use the epoch of the snapshot when setting the LastFetchEpoch in the Fetch request. Just to be clear, I think it is important to include the snapshot epoch so that we /can/ reason about the snapshot state in the presence of data loss. However, if we excluded data loss, then this would strictly speaking be unnecessary because a snapshot offset would always be uniquely defined (since we do not snapshot above the high watermark). Hence it would be safe to leave LastFetchedEpoch undefined. Anyway, I think we're on the same page about the behavior, just thought it might be useful to clarify the reasoning. Thanks, Jason On Thu, Sep 24, 2020 at 1:19 PM Jose Garcia Sancio wrote: > Thanks for the feedback Jason. > > I have made the following changes to the KIP: > 1. Better explanation of how followers will manage snapshots and the > replicated log. This includes the necessary changes when granting or > requesting votes. > 2. How the Snapshot's epoch will be used for the LastFetchEpoch in the > Fetch request. > 3. New configuration options. > 4. Changed the Fetch response to match the latest changes in KIP-595. > 5. Changed the FetchSnapshot request to include total response max bytes. > 6. Changed the FetchSnapshot response to return the snapshot size > instead of the "Continue" field. > > Diff: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=24=23 > > > 1. There is a comment in the proposal which suggests that we will > maintain > > multiple snapshots: > > > > > Having multiple snapshots is useful for minimizing re-fetching of the > > snapshot when a new snapshot is generated. > > > > However, the document later says that snapshots get deleted as the LBO > > advances. Just wanted to clarify the intent. Will we generally only have > > one snapshot? > > Generally the number of snapshots on disk will be one. I suspect that > users will want some control over this. We can add a configuration > option that doesn't delete, or advances the log begin offset past, the > N latest snapshots. We can set the default value for this > configuration to two. What do you think? > > > 2. The proposal says the following: > > > > > During leader election, followers with incomplete or missing snapshot > > will send a vote request and response as if they had an empty log. > > > > Maybe you can help me understand the scenario we're talking about since > I'm > > not sure I understand the point of this. If the intent is to not allow > such > > a follower to become leader, why would it ever become a candidate? On the > > other hand, if the intent is to still allow it to become leader in some > > disaster scenario, then why would it not use its latest log state? For > > inbound Vote requests, I think it should definitely still consider its > > latest log state when deciding whether to grant a vote. > > Conceptually followers will implement this algorithm: > 1. Follower sends fetch request > 2. Leader replies with snapshot epoch and offset > 3. Follower pauses fetch > 4. Follower fetches the snapshot > 5. Follower resume fetch by > A. Setting the LBO to the snapshot offset plus one > B. Setting the LEO or fetch offset in the fetch request to the > snapshot offset plus one > C. Uses the snapshot epoch as the last fetched epoch in the fetch > request. > > The problem I was trying to address is what is the state of the > follower between bullet 4 and 5? Let's assume that the snapshot fetch > in bullet 4 has an epoch of E and an offset of O. The follower can > have the following state on disk after bullet 4: > > 1. A snapshot with offset O and epoch E. > 2. Many snapshots
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for the feedback Jason. I have made the following changes to the KIP: 1. Better explanation of how followers will manage snapshots and the replicated log. This includes the necessary changes when granting or requesting votes. 2. How the Snapshot's epoch will be used for the LastFetchEpoch in the Fetch request. 3. New configuration options. 4. Changed the Fetch response to match the latest changes in KIP-595. 5. Changed the FetchSnapshot request to include total response max bytes. 6. Changed the FetchSnapshot response to return the snapshot size instead of the "Continue" field. Diff: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=24=23 > 1. There is a comment in the proposal which suggests that we will maintain > multiple snapshots: > > > Having multiple snapshots is useful for minimizing re-fetching of the > snapshot when a new snapshot is generated. > > However, the document later says that snapshots get deleted as the LBO > advances. Just wanted to clarify the intent. Will we generally only have > one snapshot? Generally the number of snapshots on disk will be one. I suspect that users will want some control over this. We can add a configuration option that doesn't delete, or advances the log begin offset past, the N latest snapshots. We can set the default value for this configuration to two. What do you think? > 2. The proposal says the following: > > > During leader election, followers with incomplete or missing snapshot > will send a vote request and response as if they had an empty log. > > Maybe you can help me understand the scenario we're talking about since I'm > not sure I understand the point of this. If the intent is to not allow such > a follower to become leader, why would it ever become a candidate? On the > other hand, if the intent is to still allow it to become leader in some > disaster scenario, then why would it not use its latest log state? For > inbound Vote requests, I think it should definitely still consider its > latest log state when deciding whether to grant a vote. Conceptually followers will implement this algorithm: 1. Follower sends fetch request 2. Leader replies with snapshot epoch and offset 3. Follower pauses fetch 4. Follower fetches the snapshot 5. Follower resume fetch by A. Setting the LBO to the snapshot offset plus one B. Setting the LEO or fetch offset in the fetch request to the snapshot offset plus one C. Uses the snapshot epoch as the last fetched epoch in the fetch request. The problem I was trying to address is what is the state of the follower between bullet 4 and 5? Let's assume that the snapshot fetch in bullet 4 has an epoch of E and an offset of O. The follower can have the following state on disk after bullet 4: 1. A snapshot with offset O and epoch E. 2. Many snapshots older/less than offset O and epoch E. 3. A replicated log with LEO older/less than offset O and epoch E. In this case when the follower grants a vote or becomes a candidate it should use the latest of all of this which is (1.) the fetched snapshot with offset O and epoch E. I updated the KIP to include this description. > 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as > well? It looks like we are specifying this at the partition level, but it > might be more useful to track the maximum bytes at the request level. On a > related note, it might be useful to think through heuristics for balancing > between the requests in a partition. Unlike fetches, it seems like we'd > want to complete snapshot loading partition by partition. I wonder if it > would be simpler for FetchSnapshot to handle just one partition. We could use the same configuration we have for Fetch but to avoid confusion let's add two more configurations for "replica.fetch.snapshot.max.bytes" and "replica.fetch.snapshot.response.max.bytes". > 4. It would help if the document motivated the need to track the snapshot > epoch. Since we are only snapshotting below the high watermark, are you > thinking about recovering from data loss scenarios? I added the following paragraph to the KIP: The snapshot epoch will be used when ordering snapshots and more importantly when setting the LastFetchedEpoch in the Fetch request. It is possible for a follower to have a snapshot and an empty log. In this case the follower will use the epoch of the snapshot when setting the LastFetchEpoch in the Fetch request. > > 5. Might need to fix the following: > > > Otherwise, the leader will respond with the offset and epoch of the > latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d) > > We ended up renaming the next fetch offset and epoch. I think we should > just leave it empty in this case. The snapshot offset and epoch seem > sufficient. Done. I made some changes to the "Handling Fetch Response" section too.
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi, Jose, Thanks for the updated KIP. A few more comments below. 20. "Metadata Cache: The component that generates snapshots, reads snapshots and reads logs for observer replicas of the topic partition __cluster_metadata." It seems this is needed on every broker, not just observers? 21. Our current convention is to use exclusive offset for naming checkpoint files. For example, a producer snapshot file of 1234.snapshot means that the file includes the producer state up to, but not including offset 1234. So, we probably want to follow the same convention for the new checkpoint file. 22. Snapshot Format: KIP-631 only defines the format for individual records. It seems that we need to define the container format here. For example, we need to store the length of each record. Also, does the snapshot file need a CRC field? 23. Could we provide the default value for the new configs controller.snapshot.minimum.records and max.replication.lag.ms. Also, max.replication.lag.ms seems to just control the snapshot frequency by time and not directly relate to replication. So, maybe it should be called sth like controller.snapshot.minimum.interval.ms? 24. "Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be validated using the same logic enumerated above." Hmm, should we allow deleteRecord on the metadata topic? If we do, does it trim the snapshot accordingly? 25. "The followers of the __cluster_metadata topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the replicated log." My understanding is that a follower will either fetch from the snapshot or the log, but not both at the same time. Could you explain how the concurrent part works? Also, what's an incomplete snapshot? 26. FetchRequest: 26.1 Handling Fetch Request: I agree with Jason that SnapshotOffsetAndEpoch already tells us the next offset to fetch. So, we don't need to set NextOffsetAndEpoch in the response. 26.2 Is there a reason to rename LogStartOffset to LogBeginOffset? I am not sure if they are truly identical semantically. For example, currently, the follower moves it's logStartOffset based on the leader's. Will we do the same thing with LogBeginOffset? 27. FetchSnapshotRequest: It seems that SnapshotOffsetAndEpoch shouldn't be optional. Also, its version number 12 is incorrect. 28. FetchSnapshotResponse: Do we need the position field? It seems it's the same as in the request. 29. "OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is greater than the size of the snapshot." By offset, do you mean position? 30. It's possible for a broker to die while copying the snapshot file from the leader or saving its locally generated snapshot. On restart, how does the broker know whether a local snapshot file is complete or not? Thanks, Jun On Fri, Sep 18, 2020 at 1:38 PM Jason Gustafson wrote: > Hi Jose, > > A few comments/questions below: > > 1. There is a comment in the proposal which suggests that we will maintain > multiple snapshots: > > > Having multiple snapshots is useful for minimizing re-fetching of the > snapshot when a new snapshot is generated. > > However, the document later says that snapshots get deleted as the LBO > advances. Just wanted to clarify the intent. Will we generally only have > one snapshot? > > 2. The proposal says the following: > > > During leader election, followers with incomplete or missing snapshot > will send a vote request and response as if they had an empty log. > > Maybe you can help me understand the scenario we're talking about since I'm > not sure I understand the point of this. If the intent is to not allow such > a follower to become leader, why would it ever become a candidate? On the > other hand, if the intent is to still allow it to become leader in some > disaster scenario, then why would it not use its latest log state? For > inbound Vote requests, I think it should definitely still consider its > latest log state when deciding whether to grant a vote. > > 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as > well? It looks like we are specifying this at the partition level, but it > might be more useful to track the maximum bytes at the request level. On a > related note, it might be useful to think through heuristics for balancing > between the requests in a partition. Unlike fetches, it seems like we'd > want to complete snapshot loading partition by partition. I wonder if it > would be simpler for FetchSnapshot to handle just one partition. > > 4. It would help if the document motivated the need to track the snapshot > epoch. Since we are only snapshotting below the high watermark, are you > thinking about recovering from data loss scenarios? > > 5. Might need to fix the following: > > > Otherwise, the leader will
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi Jose, A few comments/questions below: 1. There is a comment in the proposal which suggests that we will maintain multiple snapshots: > Having multiple snapshots is useful for minimizing re-fetching of the snapshot when a new snapshot is generated. However, the document later says that snapshots get deleted as the LBO advances. Just wanted to clarify the intent. Will we generally only have one snapshot? 2. The proposal says the following: > During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log. Maybe you can help me understand the scenario we're talking about since I'm not sure I understand the point of this. If the intent is to not allow such a follower to become leader, why would it ever become a candidate? On the other hand, if the intent is to still allow it to become leader in some disaster scenario, then why would it not use its latest log state? For inbound Vote requests, I think it should definitely still consider its latest log state when deciding whether to grant a vote. 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as well? It looks like we are specifying this at the partition level, but it might be more useful to track the maximum bytes at the request level. On a related note, it might be useful to think through heuristics for balancing between the requests in a partition. Unlike fetches, it seems like we'd want to complete snapshot loading partition by partition. I wonder if it would be simpler for FetchSnapshot to handle just one partition. 4. It would help if the document motivated the need to track the snapshot epoch. Since we are only snapshotting below the high watermark, are you thinking about recovering from data loss scenarios? 5. Might need to fix the following: > Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d) We ended up renaming the next fetch offset and epoch. I think we should just leave it empty in this case. The snapshot offset and epoch seem sufficient. Thanks, Jason On Fri, Aug 7, 2020 at 11:33 AM Jose Garcia Sancio wrote: > Thanks for your feedback Jun. > > Here are my changes to the KIP: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=21=20 > > My comments are below... > > On Wed, Aug 5, 2020 at 1:59 PM Jun Rao wrote: > > > > Hi, Jose, > > > > Thanks for the KIP. A few comments blow. > > > > 10. I agree with Jason that it's useful to document the motivation a bit > > clearer. Regarding semantic/performance, one benefit of snapshotting is > > that it allows changes to be encoded incrementally instead of using the > > full post image. For example, in KIP-631, each partition has multiple > > fields like assigned replicas, leader, epoch, isr, etc. If only isr is > > changed, the snapshotting approach allows the change to be represented > with > > just the new value in isr. Compaction will require all existing fields to > > be included in order to represent just an isr change. This is just > because > > we can customize the combining logic with snapshotting. > > > > Yes. Right now the IsrChange record from KIP-631 has both the ISR and > the leader and epoch. I think we can split this record into two > records: > 1. ISR change that includes the topic, partition, and isr. > 2. Leader change that includes the topic, partition, leader and leader > epoch. > I'll bring this up in the discussion thread for that KIP. > > > As for the > > performance benefit, I guess in theory snapshotting allows the snapshot > to > > be updated in-place incrementally without having to read the full state > in > > the snapshot. BTW, during compaction, we only read the cleaned data once > > instead of 3 times. > > > > Doesn't compaction need to read the clean records to compare if the > key is in the map of keys to offset? I made the following changes to > the KIP: > > 2. With log compaction the broker needs to > a. read 1MB/s from the head of the log to update the in-memory state > b. read 1MB/s to update the map of keys to offsets > c. read 3MB/s (100MB from the already compacted log, 50MB from the > new key-value records) from the older segments. The log will > accumulate 50MB in 50 seconds worth of changes before compacting > because the default configuration has a minimum clean ratio of 50%. > > The "100MB in the already compacted log" are these cleaned records. > Let me know what you think and if I am missing something. > > > 11. The KIP mentions topic id. Currently there is no topic id. Does this > > KIP depend on KIP-516? > > > > For the purpose of measuring the impact, I was using the records > proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on > its design and implementation. I was just referencing that KIP in the > motivation and analysis. The KIP only assumes the changes in KIP-595 > which has been approved but
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for your feedback Jun. Here are my changes to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=21=20 My comments are below... On Wed, Aug 5, 2020 at 1:59 PM Jun Rao wrote: > > Hi, Jose, > > Thanks for the KIP. A few comments blow. > > 10. I agree with Jason that it's useful to document the motivation a bit > clearer. Regarding semantic/performance, one benefit of snapshotting is > that it allows changes to be encoded incrementally instead of using the > full post image. For example, in KIP-631, each partition has multiple > fields like assigned replicas, leader, epoch, isr, etc. If only isr is > changed, the snapshotting approach allows the change to be represented with > just the new value in isr. Compaction will require all existing fields to > be included in order to represent just an isr change. This is just because > we can customize the combining logic with snapshotting. > Yes. Right now the IsrChange record from KIP-631 has both the ISR and the leader and epoch. I think we can split this record into two records: 1. ISR change that includes the topic, partition, and isr. 2. Leader change that includes the topic, partition, leader and leader epoch. I'll bring this up in the discussion thread for that KIP. > As for the > performance benefit, I guess in theory snapshotting allows the snapshot to > be updated in-place incrementally without having to read the full state in > the snapshot. BTW, during compaction, we only read the cleaned data once > instead of 3 times. > Doesn't compaction need to read the clean records to compare if the key is in the map of keys to offset? I made the following changes to the KIP: 2. With log compaction the broker needs to a. read 1MB/s from the head of the log to update the in-memory state b. read 1MB/s to update the map of keys to offsets c. read 3MB/s (100MB from the already compacted log, 50MB from the new key-value records) from the older segments. The log will accumulate 50MB in 50 seconds worth of changes before compacting because the default configuration has a minimum clean ratio of 50%. The "100MB in the already compacted log" are these cleaned records. Let me know what you think and if I am missing something. > 11. The KIP mentions topic id. Currently there is no topic id. Does this > KIP depend on KIP-516? > For the purpose of measuring the impact, I was using the records proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on its design and implementation. I was just referencing that KIP in the motivation and analysis. The KIP only assumes the changes in KIP-595 which has been approved but it are not part of trunk yet. In the overview section the KIP mentions: "This KIP assumes that KIP-595 has been approved and implemented. " > 12. Is there a need to keep more than 1 snapshot? It seems we always expose > the latest snapshot to clients. > The KIP proposes keeping more than one snapshot to not invalidate any pending/concurrent `FetchSnapshot` that are attempting to fetch a snapshot that can be deleted. I'll remove this wording as the first version of this implementation will probably won't have this feature as it requires extra coordination. The implementation will still allow for multiple snapshots because generating a snapshot is not atomic with respect to increasing the LBO. > 13. "During leader election, followers with incomplete or missing snapshot > will send a vote request and response as if they had an empty log." Hmm, a > follower may not have a snapshot created, but that doesn't imply its log is > empty. > Yes. I fixed the "Validation of Snapshot and Log" and that sentence. I basically added an additional condition where a snapshot is not required if the LBO is 0. > 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do we > compare an offset to a time? > Yeah. This may be hard to implement. I am trying to avoid invalidating followers and observers by aggressively deleting an offset/record which they are trying to fetch. It is possible that `controller.snapshot.minimum.records` is good enough to throttle increasing LBO. > 15. "Followers and observers will increase their log begin offset to the > value sent on the fetch response as long as the local state machine has > generated a snapshot that includes to the log begin offset minus one." Does > the observer store the log? I thought it only needed to maintain a > snapshot. If so, the observer doesn't need to maintain LBO. > In KIP-595 observers are similar to voters/followers in that they Fetch the log and the snapshot. Two of the distinctions are that they don't participate in the leader election and they are not included when computing the high-watermark. Regarding storing: it is possible that observers never need to store the log since they don't vote or become leaders. I think in the future we would like to implement "KIP-642: Dynamic quorum reassignment" which would add the capability to
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi, Jose, Thanks for the KIP. A few comments blow. 10. I agree with Jason that it's useful to document the motivation a bit clearer. Regarding semantic/performance, one benefit of snapshotting is that it allows changes to be encoded incrementally instead of using the full post image. For example, in KIP-631, each partition has multiple fields like assigned replicas, leader, epoch, isr, etc. If only isr is changed, the snapshotting approach allows the change to be represented with just the new value in isr. Compaction will require all existing fields to be included in order to represent just an isr change. This is just because we can customize the combining logic with snapshotting. As for the performance benefit, I guess in theory snapshotting allows the snapshot to be updated in-place incrementally without having to read the full state in the snapshot. BTW, during compaction, we only read the cleaned data once instead of 3 times. 11. The KIP mentions topic id. Currently there is no topic id. Does this KIP depend on KIP-516? 12. Is there a need to keep more than 1 snapshot? It seems we always expose the latest snapshot to clients. 13. "During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log." Hmm, a follower may not have a snapshot created, but that doesn't imply its log is empty. 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do we compare an offset to a time? 15. "Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one." Does the observer store the log? I thought it only needed to maintain a snapshot. If so, the observer doesn't need to maintain LBO. 16. "There are two cases when the Kafka Controller needs to load the snapshot: When it is booting. When the follower and observer replicas finishes fetching a new snapshot from the leader." For faster failover, it seems it's useful for a non-controller voter to maintain the in-memory metadata state. In order to do that, it seems that every voter needs to load the snapshot on booting? 17. "There is an invariant that every log must have at least one snapshot where the offset of the snapshot is between LogBeginOffset - 1 and High-Watermark. If this is not the case then the replica should assume that the log is empty." Does that invariant hold when there is no snapshot initially? 18. "The __cluster_metadata topic will have an cleanup.policy value of snapshot" Is there a need to make this configurable if it's read-only? 19. OFFSET_OUT_OF_RANGE in FetchSnapshotResponse: It seems that POSITION_OUT_OF_RANGE is more appropriate? Thanks, Jun On Wed, Aug 5, 2020 at 12:13 PM Jose Garcia Sancio wrote: > Once again, thanks for the feedback Jason, > > My changes to the KIP are here: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=18=17 > > And see my comments below... > > On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson wrote: > > > > Hi Jose, > > > > Thanks for the proposal. I think there are three main motivations for > > snapshotting over the existing compaction semantics. > > > > First we are arguing that compaction is a poor semantic fit for how we > want > > to model the metadata in the cluster. We are trying to view the changes > in > > the cluster as a stream of events, not necessarily as a stream of > key/value > > updates. The reason this is useful is that a single event may correspond > to > > a set of key/value updates. We don't need to delete each partition > > individually for example if we are deleting the full topic. Outside of > > deletion, however, the benefits of this approach are less obvious. I am > > wondering if there are other cases where the event-based approach has > some > > benefit? > > > > Yes. Another example of this is what KIP-631 calls FenceBroker. In the > current implementation of the Kafka Controller and the implementation > proposed in > KIP-631, whenever a broker is fenced the controller removes the broker > from the ISR and performs leader election if necessary. The impact of > this operation on replication is documented in section "Amount of Data > Replicated". I have also updated the KIP to reflect this. > > > The second motivation is from the perspective of consistency. Basically > we > > don't like the existing solution for the tombstone deletion problem, > which > > is just to add a delay before removal. The case we are concerned about > > requires a replica to fetch up to a specific offset and then stall for a > > time which is longer than the deletion retention timeout. If this > happens, > > then the replica might not see the tombstone, which would lead to an > > inconsistent state. I think we are already talking about a rare case, > but I > > wonder if there are simple ways to tighten it further. For the sake of > >
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Once again, thanks for the feedback Jason, My changes to the KIP are here: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=18=17 And see my comments below... On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson wrote: > > Hi Jose, > > Thanks for the proposal. I think there are three main motivations for > snapshotting over the existing compaction semantics. > > First we are arguing that compaction is a poor semantic fit for how we want > to model the metadata in the cluster. We are trying to view the changes in > the cluster as a stream of events, not necessarily as a stream of key/value > updates. The reason this is useful is that a single event may correspond to > a set of key/value updates. We don't need to delete each partition > individually for example if we are deleting the full topic. Outside of > deletion, however, the benefits of this approach are less obvious. I am > wondering if there are other cases where the event-based approach has some > benefit? > Yes. Another example of this is what KIP-631 calls FenceBroker. In the current implementation of the Kafka Controller and the implementation proposed in KIP-631, whenever a broker is fenced the controller removes the broker from the ISR and performs leader election if necessary. The impact of this operation on replication is documented in section "Amount of Data Replicated". I have also updated the KIP to reflect this. > The second motivation is from the perspective of consistency. Basically we > don't like the existing solution for the tombstone deletion problem, which > is just to add a delay before removal. The case we are concerned about > requires a replica to fetch up to a specific offset and then stall for a > time which is longer than the deletion retention timeout. If this happens, > then the replica might not see the tombstone, which would lead to an > inconsistent state. I think we are already talking about a rare case, but I > wonder if there are simple ways to tighten it further. For the sake of > argument, what if we had the replica start over from the beginning whenever > there is a replication delay which is longer than tombstone retention time? > Just want to be sure we're not missing any simple/pragmatic solutions > here... > We explore the changes needed to log compaction and the fetch protocol such that it results in a consistent replicated log in the rejected sections. I changed the KIP to also mention it in the motivation section by adding a section called "Consistent Log and Tombstones" > Finally, I think we are arguing that compaction gives a poor performance > tradeoff when the state is already in memory. It requires us to read and > replay all of the changes even though we already know the end result. One > way to think about it is that compaction works O(the rate of changes) while > snapshotting is O(the size of data). Contrarily, the nice thing about > compaction is that it works irrespective of the size of the data, which > makes it a better fit for user partitions. I feel like this might be an > argument we can make empirically or at least with back-of-the-napkin > calculations. If we assume a fixed size of data and a certain rate of > change, then what are the respective costs of snapshotting vs compaction? I > think compaction fares worse as the rate of change increases. In the case > of __consumer_offsets, which sometimes has to support a very high rate of > offset commits, I think snapshotting would be a great tradeoff to reduce > load time on coordinator failover. The rate of change for metadata on the > other hand might not be as high, though it can be very bursty. > This is a very good observation. If you assume that the number of keys doesn't change but that we have frequent updates to its values then I think that after log compaction the size of the compacted section of the log is O(size of the data) + O(size of the tombstones). And as you point out the size of the snapshot is also O(size of the data). I think this is a reasonable assumption for topics like __cluster_metadata and __consumer_offsets. The difference is the number of reads required. With in-memory snapshot we only need to read the log once. With log compaction we need to read the log 3 times: 1. to update the in-memory state, 2. generate the map of key to offset and 3. compact the log using the map of keys to offset. I have updated the KIP and go into a lot more details in section "Loading State and Frequency of Compaction -- -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks for your feedback Jason. I'll have a more detailed reply and update to the KIP by EOD today. On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson wrote: > > Hi Jose, > > Thanks for the proposal. I think there are three main motivations for > snapshotting over the existing compaction semantics. > > First we are arguing that compaction is a poor semantic fit for how we want > to model the metadata in the cluster. We are trying to view the changes in > the cluster as a stream of events, not necessarily as a stream of key/value > updates. The reason this is useful is that a single event may correspond to > a set of key/value updates. We don't need to delete each partition > individually for example if we are deleting the full topic. Outside of > deletion, however, the benefits of this approach are less obvious. I am > wondering if there are other cases where the event-based approach has some > benefit? > > The second motivation is from the perspective of consistency. Basically we > don't like the existing solution for the tombstone deletion problem, which > is just to add a delay before removal. The case we are concerned about > requires a replica to fetch up to a specific offset and then stall for a > time which is longer than the deletion retention timeout. If this happens, > then the replica might not see the tombstone, which would lead to an > inconsistent state. I think we are already talking about a rare case, but I > wonder if there are simple ways to tighten it further. For the sake of > argument, what if we had the replica start over from the beginning whenever > there is a replication delay which is longer than tombstone retention time? > Just want to be sure we're not missing any simple/pragmatic solutions > here... > > Finally, I think we are arguing that compaction gives a poor performance > tradeoff when the state is already in memory. It requires us to read and > replay all of the changes even though we already know the end result. One > way to think about it is that compaction works O(the rate of changes) while > snapshotting is O(the size of data). Contrarily, the nice thing about > compaction is that it works irrespective of the size of the data, which > makes it a better fit for user partitions. I feel like this might be an > argument we can make empirically or at least with back-of-the-napkin > calculations. If we assume a fixed size of data and a certain rate of > change, then what are the respective costs of snapshotting vs compaction? I > think compaction fares worse as the rate of change increases. In the case > of __consumer_offsets, which sometimes has to support a very high rate of > offset commits, I think snapshotting would be a great tradeoff to reduce > load time on coordinator failover. The rate of change for metadata on the > other hand might not be as high, though it can be very bursty. > > Thanks, > Jason > > > On Wed, Jul 29, 2020 at 2:03 PM Jose Garcia Sancio > wrote: > > > Thanks Ron for the additional comments and suggestions. > > > > Here are the changes to the KIP: > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15 > > > > On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino wrote: > > > > > > Thanks, Jose. It's looking good. Here is one minor correction: > > > > > > <<< If the Kafka topic partition leader receives a fetch request with an > > > offset and epoch greater than or equal to the LBO (x + 1, a) > > > >>> If the Kafka topic partition leader receives a fetch request with an > > > offset and epoch greater than or equal to the LBO (x + 1, b) > > > > > > > Done. > > > > > Here is one more question. Is there an ability to evolve the snapshot > > > format over time, and if so, how is that managed for upgrades? It would > > be > > > both Controllers and Brokers that would depend on the format, correct? > > > Those could be the same thing if the controller was running inside the > > > broker JVM, but that is an option rather than a requirement, I think. > > > Might the Controller upgrade have to be coordinated with the broker > > upgrade > > > in the separate-JVM case? Perhaps a section discussing this would be > > > appropriate? > > > > > > > The content set though the FetchSnapshot RPC is expected to be > > compatible with future changes. In KIP-631 the Kafka Controller is > > going to use the existing Kafka Message and versioning scheme. > > Specifically see section "Record Format Versions". I added some > > wording around this. > > > > Thanks! > > -Jose > > -- -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi Jose, Thanks for the proposal. I think there are three main motivations for snapshotting over the existing compaction semantics. First we are arguing that compaction is a poor semantic fit for how we want to model the metadata in the cluster. We are trying to view the changes in the cluster as a stream of events, not necessarily as a stream of key/value updates. The reason this is useful is that a single event may correspond to a set of key/value updates. We don't need to delete each partition individually for example if we are deleting the full topic. Outside of deletion, however, the benefits of this approach are less obvious. I am wondering if there are other cases where the event-based approach has some benefit? The second motivation is from the perspective of consistency. Basically we don't like the existing solution for the tombstone deletion problem, which is just to add a delay before removal. The case we are concerned about requires a replica to fetch up to a specific offset and then stall for a time which is longer than the deletion retention timeout. If this happens, then the replica might not see the tombstone, which would lead to an inconsistent state. I think we are already talking about a rare case, but I wonder if there are simple ways to tighten it further. For the sake of argument, what if we had the replica start over from the beginning whenever there is a replication delay which is longer than tombstone retention time? Just want to be sure we're not missing any simple/pragmatic solutions here... Finally, I think we are arguing that compaction gives a poor performance tradeoff when the state is already in memory. It requires us to read and replay all of the changes even though we already know the end result. One way to think about it is that compaction works O(the rate of changes) while snapshotting is O(the size of data). Contrarily, the nice thing about compaction is that it works irrespective of the size of the data, which makes it a better fit for user partitions. I feel like this might be an argument we can make empirically or at least with back-of-the-napkin calculations. If we assume a fixed size of data and a certain rate of change, then what are the respective costs of snapshotting vs compaction? I think compaction fares worse as the rate of change increases. In the case of __consumer_offsets, which sometimes has to support a very high rate of offset commits, I think snapshotting would be a great tradeoff to reduce load time on coordinator failover. The rate of change for metadata on the other hand might not be as high, though it can be very bursty. Thanks, Jason On Wed, Jul 29, 2020 at 2:03 PM Jose Garcia Sancio wrote: > Thanks Ron for the additional comments and suggestions. > > Here are the changes to the KIP: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15 > > On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino wrote: > > > > Thanks, Jose. It's looking good. Here is one minor correction: > > > > <<< If the Kafka topic partition leader receives a fetch request with an > > offset and epoch greater than or equal to the LBO (x + 1, a) > > >>> If the Kafka topic partition leader receives a fetch request with an > > offset and epoch greater than or equal to the LBO (x + 1, b) > > > > Done. > > > Here is one more question. Is there an ability to evolve the snapshot > > format over time, and if so, how is that managed for upgrades? It would > be > > both Controllers and Brokers that would depend on the format, correct? > > Those could be the same thing if the controller was running inside the > > broker JVM, but that is an option rather than a requirement, I think. > > Might the Controller upgrade have to be coordinated with the broker > upgrade > > in the separate-JVM case? Perhaps a section discussing this would be > > appropriate? > > > > The content set though the FetchSnapshot RPC is expected to be > compatible with future changes. In KIP-631 the Kafka Controller is > going to use the existing Kafka Message and versioning scheme. > Specifically see section "Record Format Versions". I added some > wording around this. > > Thanks! > -Jose >
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks Ron for the additional comments and suggestions. Here are the changes to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15 On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino wrote: > > Thanks, Jose. It's looking good. Here is one minor correction: > > <<< If the Kafka topic partition leader receives a fetch request with an > offset and epoch greater than or equal to the LBO (x + 1, a) > >>> If the Kafka topic partition leader receives a fetch request with an > offset and epoch greater than or equal to the LBO (x + 1, b) > Done. > Here is one more question. Is there an ability to evolve the snapshot > format over time, and if so, how is that managed for upgrades? It would be > both Controllers and Brokers that would depend on the format, correct? > Those could be the same thing if the controller was running inside the > broker JVM, but that is an option rather than a requirement, I think. > Might the Controller upgrade have to be coordinated with the broker upgrade > in the separate-JVM case? Perhaps a section discussing this would be > appropriate? > The content set though the FetchSnapshot RPC is expected to be compatible with future changes. In KIP-631 the Kafka Controller is going to use the existing Kafka Message and versioning scheme. Specifically see section "Record Format Versions". I added some wording around this. Thanks! -Jose
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks, Jose. It's looking good. Here is one minor correction: <<< If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO (x + 1, a) >>> If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO (x + 1, b) Here is one more question. Is there an ability to evolve the snapshot format over time, and if so, how is that managed for upgrades? It would be both Controllers and Brokers that would depend on the format, correct? Those could be the same thing if the controller was running inside the broker JVM, but that is an option rather than a requirement, I think. Might the Controller upgrade have to be coordinated with the broker upgrade in the separate-JVM case? Perhaps a section discussing this would be appropriate? Ron On Tue, Jul 28, 2020 at 11:14 PM Jose Garcia Sancio wrote: > Thanks Ron. Your comments and suggestions were helpful. You can see my > changes to the KIP here: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=15=14 > > My comments are below... > > On Mon, Jul 27, 2020 at 11:29 AM Ron Dagostino wrote: > > > > Hi Jose. Thanks for the KIP. Here are some questions and some nit > corrections. > > > > <<< In KIP-500 the Kafka Controller, which is the quorum leader from > > KIP-595, will materialize the entries in the metadata log into memory. > > Technically I think the quorum leader is referred to as the Active > > Controller in KIP-500. Maybe replace "Kafka Controller" with "Active > > Controller"? I think the term "Kafka Controller" is fine as used > > throughout the rest of the KIP to refer to the entire thing, but when > > referring specifically to the leader I think "Active Controller" is > > the term that is defined in KIP-500. > > > > Made those changes. > > > > > <<< Each broker in KIP-500, which will be a replica of the metadata > > log, will materialize the entries in the log into a metadata cache > > This wording confused me because I assumed that "replica" was a formal > > term and only (non-Active) Controllers are formally "replicas" of the > > metadata log -- Kafka brokers would be clients that read the log and > > then use the data for their own purpose as opposed to formally being > > replicas with this understanding of the term "replica". Is that > > correct, and if so, maybe replace "replica" with "client"? > > > > In KIP-595 we have two types of replicas: voters and observers. Voter > replicas are Kafka Controllers and one one of them will become the > Active controller. Observer replicas fetch from the log and attempt to > keep up with the LEO of the Active Controller. I think you can > consider all of them as "client" of the replicated log. > > > > > <<< The type of in-memory state machines what we plan to implement > > >>> The type of in-memory state machines that we plan to implement > > nit > > > > Done. > > > > > <<< doesn't map very well to an key and offset based clean up policy. > > >>> doesn't map very well to a key and offset based clean up policy. > > nit > > > > Done. > > > > > <<< When starting a broker either because it is a new broker, a broker > > was upgraded or a failed broker is restarting. Loading the state > > represented by the __cluster_metadata topic partition is required > > before the broker is available > > >>> When starting a broker either because it is a new broker or it is > restarting, loading the state represented by the __cluster_metadata topic > partition is required before the broker is available. > > Reword for simplicity and clarity? > > > > Done. > > > > > <<< With snapshot based of the in-memory state Kafka can be much more > aggressive > > >>> By taking and transmitting a snapshot of the in-memory state as > described below Kafka can be much more aggressive > > Tough to refer to the concept of snapshot here without having > > described what it is, so refer to "as described below" to help orient > > the reader? > > > > Made some changes to these sentences. I agree that fully understanding > parts of the motivated section requires reading the rest of the > document. I wanted to make sure that we had this in the motivation > section. > > > > > <<< In the future this mechanism will also be useful for > > high-throughput topic partitions like the Group Coordinator and > > Transaction Coordinator. > > >>> In the future this mechanism may also be useful for high-throughput > topic partitions like the Group Coordinator and Transaction Coordinator. > > Tough to say "will" when that is an assumption that would depend on a > KIP? > > > > Yeah. Changed it. > > > > > << > __cluster_metadata topic partition then the ... Kafka Controller will > > need to replicate 3.81 MB to each broker in the cluster (10) or 38.14 > > MB. > > It might be good to append a sentence that explicitly states how much > > data is replicated for the delta/event -- right now it is implied to > > be
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Thanks Ron. Your comments and suggestions were helpful. You can see my changes to the KIP here: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=15=14 My comments are below... On Mon, Jul 27, 2020 at 11:29 AM Ron Dagostino wrote: > > Hi Jose. Thanks for the KIP. Here are some questions and some nit > corrections. > > <<< In KIP-500 the Kafka Controller, which is the quorum leader from > KIP-595, will materialize the entries in the metadata log into memory. > Technically I think the quorum leader is referred to as the Active > Controller in KIP-500. Maybe replace "Kafka Controller" with "Active > Controller"? I think the term "Kafka Controller" is fine as used > throughout the rest of the KIP to refer to the entire thing, but when > referring specifically to the leader I think "Active Controller" is > the term that is defined in KIP-500. > Made those changes. > > <<< Each broker in KIP-500, which will be a replica of the metadata > log, will materialize the entries in the log into a metadata cache > This wording confused me because I assumed that "replica" was a formal > term and only (non-Active) Controllers are formally "replicas" of the > metadata log -- Kafka brokers would be clients that read the log and > then use the data for their own purpose as opposed to formally being > replicas with this understanding of the term "replica". Is that > correct, and if so, maybe replace "replica" with "client"? > In KIP-595 we have two types of replicas: voters and observers. Voter replicas are Kafka Controllers and one one of them will become the Active controller. Observer replicas fetch from the log and attempt to keep up with the LEO of the Active Controller. I think you can consider all of them as "client" of the replicated log. > > <<< The type of in-memory state machines what we plan to implement > >>> The type of in-memory state machines that we plan to implement > nit > Done. > > <<< doesn't map very well to an key and offset based clean up policy. > >>> doesn't map very well to a key and offset based clean up policy. > nit > Done. > > <<< When starting a broker either because it is a new broker, a broker > was upgraded or a failed broker is restarting. Loading the state > represented by the __cluster_metadata topic partition is required > before the broker is available > >>> When starting a broker either because it is a new broker or it is > >>> restarting, loading the state represented by the __cluster_metadata topic > >>> partition is required before the broker is available. > Reword for simplicity and clarity? > Done. > > <<< With snapshot based of the in-memory state Kafka can be much more > aggressive > >>> By taking and transmitting a snapshot of the in-memory state as described > >>> below Kafka can be much more aggressive > Tough to refer to the concept of snapshot here without having > described what it is, so refer to "as described below" to help orient > the reader? > Made some changes to these sentences. I agree that fully understanding parts of the motivated section requires reading the rest of the document. I wanted to make sure that we had this in the motivation section. > > <<< In the future this mechanism will also be useful for > high-throughput topic partitions like the Group Coordinator and > Transaction Coordinator. > >>> In the future this mechanism may also be useful for high-throughput topic > >>> partitions like the Group Coordinator and Transaction Coordinator. > Tough to say "will" when that is an assumption that would depend on a KIP? > Yeah. Changed it. > > << __cluster_metadata topic partition then the ... Kafka Controller will > need to replicate 3.81 MB to each broker in the cluster (10) or 38.14 > MB. > It might be good to append a sentence that explicitly states how much > data is replicated for the delta/event -- right now it is implied to > be very small, but that's kind of like leaving the punch line to a > joke implied :-) > Thanks. I updated the example and added numbers for the events/deltas case. > > <<< Follower and observer replicas fetch the snapshots from the leader > they attempt to fetch an offset from the leader and the leader doesn’t > have that offset in the log > >>> Follower and observer replicas fetch a snapshot from the leader when they > >>> attempt to fetch an offset from the leader and the leader doesn’t have > >>> that offset in the log > nit > Done. > > >>> Generating and loading the snapshot will be delegated to the Kafka > >>> Controller. > >>> The Kafka Controller will notify the Kafka Raft client when it has > >>> generated a snapshot and up to which offset is included in the snapshot. > >>> The Kafka Raft client will notify the Kafka Controller when a new > >>> snapshot has been fetched from the leader. > This paragraph confuses me. What is the "Kafka Raft client" -- is > this the broker? Or is it some other subsystem (or all other > subsystems aside from log
Re: [DISCUSS] KIP-630: Kafka Raft Snapshot
Hi Jose. Thanks for the KIP. Here are some questions and some nit corrections. <<< In KIP-500 the Kafka Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Technically I think the quorum leader is referred to as the Active Controller in KIP-500. Maybe replace "Kafka Controller" with "Active Controller"? I think the term "Kafka Controller" is fine as used throughout the rest of the KIP to refer to the entire thing, but when referring specifically to the leader I think "Active Controller" is the term that is defined in KIP-500. <<< Each broker in KIP-500, which will be a replica of the metadata log, will materialize the entries in the log into a metadata cache This wording confused me because I assumed that "replica" was a formal term and only (non-Active) Controllers are formally "replicas" of the metadata log -- Kafka brokers would be clients that read the log and then use the data for their own purpose as opposed to formally being replicas with this understanding of the term "replica". Is that correct, and if so, maybe replace "replica" with "client"? <<< The type of in-memory state machines what we plan to implement >>> The type of in-memory state machines that we plan to implement nit <<< doesn't map very well to an key and offset based clean up policy. >>> doesn't map very well to a key and offset based clean up policy. nit <<< When starting a broker either because it is a new broker, a broker was upgraded or a failed broker is restarting. Loading the state represented by the __cluster_metadata topic partition is required before the broker is available >>> When starting a broker either because it is a new broker or it is >>> restarting, loading the state represented by the __cluster_metadata topic >>> partition is required before the broker is available. Reword for simplicity and clarity? <<< With snapshot based of the in-memory state Kafka can be much more aggressive >>> By taking and transmitting a snapshot of the in-memory state as described >>> below Kafka can be much more aggressive Tough to refer to the concept of snapshot here without having described what it is, so refer to "as described below" to help orient the reader? <<< In the future this mechanism will also be useful for high-throughput topic partitions like the Group Coordinator and Transaction Coordinator. >>> In the future this mechanism may also be useful for high-throughput topic >>> partitions like the Group Coordinator and Transaction Coordinator. Tough to say "will" when that is an assumption that would depend on a KIP? <<>> Follower and observer replicas fetch a snapshot from the leader when they >>> attempt to fetch an offset from the leader and the leader doesn’t have that >>> offset in the log nit >>> Generating and loading the snapshot will be delegated to the Kafka >>> Controller. >>> The Kafka Controller will notify the Kafka Raft client when it has >>> generated a snapshot and up to which offset is included in the snapshot. >>> The Kafka Raft client will notify the Kafka Controller when a new snapshot >>> has been fetched from the leader. This paragraph confuses me. What is the "Kafka Raft client" -- is this the broker? Or is it some other subsystem (or all other subsystems aside from log replication) within the Controller? Has this been defined somewhere? If so it would be good to refer to that definition. (Actually, now that I've read further down, I think you refer to this as "Kafka Raft" later in the KIP; a reference to these later sections or naming it Kafka Raft Client later on would have helped me avoid confusion -- I searched the doc for raft client rather than kafka raft, so I missed this when I searched.) <<< The Kafka Controller will notify the Kafka Raft client when it has finished generating a new snapshots. Same comment about "Kafka Raft client". <<< It is safe for the log to truncate a prefix of the log up to the latest snapshot. "log to truncate a prefix of the log" -- That first mention of "log" needs to be something else I assume -- LogManager maybe? <<< In the example above, if the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset (x, a) <<< In the example above, offset=x, epoch=a does not appear in the diagram because it is before the log begin offset (x+1, b) If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO Maybe add an explicit comment that offset=x, epoch=a does not appear in the diagram because it is before the LBO of (x+1, b)? Also need to fix LBO reference (currently incorrectly stated as (x, a). <<< LBO can be increase/set to an offset X if the following is true: <<< 2. One of the following is true: Do the pair of conditions in (2) only apply to the leader/Active Controller? <<< The broker will delete any snapshot with a latest offset and epoch