Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2021-09-28 Thread José Armando García Sancio
During the development of KIP-630 we made some minor changes to the
KIP to better match the implementation details. Here is a summary of
the changes we made to the KIP:

1. Added control records at the begin and end of the snapshots. The
control records are versioned. The snapshot header record includes the
append time of the last record from the log included in the snapshot.
This is useful when determining when to delete a snapshot.
2. The configuration property metadata.snapshot.min.new_records.size
was renamed to metadata.log.max.record.bytes.between.snapshots.
3. The FetchSnapshotRequest schema was changed to include the cluster
id and the current leader epoch. This is used by the leader of the
metadata log to validate that the request matches the current cluster
id and the current leader epoch.
4. The FetchSnapshotResponse schema was changed to include the snapshot id.

KIP-630: https://cwiki.apache.org/confluence/x/exV4CQ

Thanks!
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
I am going to propose that we take a vote on this KIP.

Thank you Jason, Ron, Jun and Guozhang for the feedback and discussion.

On Fri, Oct 2, 2020 at 3:11 PM Jose Garcia Sancio  wrote:
>
> Thank you Jun!
>
> Changes:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=37=36
>
> > 41. Perhaps metadata.snapshot.min.records.size can just
> > be metadata.snapshot.min.records?
>
> Sounds good to me. Done.
>
> > 42. It's probably fine to change maxBytes for Fetch in a separate PR. I
> > brought this up since this KIP is changing the Fetch request.
>
> Okay. Minor nit. KIP-630 changed the Fetch response. We will bump the
> Fetch request version because it is required but the KIP doesn't make
> any changes to the fields in the Fetch request.
>
> -Jose



-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
Thank you Jun!

Changes:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=37=36

> 41. Perhaps metadata.snapshot.min.records.size can just
> be metadata.snapshot.min.records?

Sounds good to me. Done.

> 42. It's probably fine to change maxBytes for Fetch in a separate PR. I
> brought this up since this KIP is changing the Fetch request.

Okay. Minor nit. KIP-630 changed the Fetch response. We will bump the
Fetch request version because it is required but the KIP doesn't make
any changes to the fields in the Fetch request.

-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jun Rao
Hi, Jose,

Thanks for the updated KIP. Just a couple of minor comments.

41. Perhaps metadata.snapshot.min.records.size can just
be metadata.snapshot.min.records?

42. It's probably fine to change maxBytes for Fetch in a separate PR. I
brought this up since this KIP is changing the Fetch request.

Thanks,

Jun

On Fri, Oct 2, 2020 at 12:03 PM Jose Garcia Sancio 
wrote:

> I read through KIP-630 and made the following minor changes.
>
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=36=35
>
> --
> -Jose
>


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
I read through KIP-630 and made the following minor changes.

https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=36=35

-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jose Garcia Sancio
Comments below.

Here are the change to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=34=33

> 41. That's a good point. With compacted topic, the cleaning won't be done
> until the active segment rolls. With snapshots, I guess we don't have this
> restriction? So, it would be useful to guard against too frequent
> snapshotting. Does the new proposal address this completely? If the
> snapshot has only 1 record, and the new record keeps updating the same key,
> does that still cause the snapshot to be generated frequently?

That is true. In addition to metadata.snapshot.min.cleanable.ratio we
can add the following configuration:

metadata.snapshot.min.records.size - This is the minimum number of
bytes in the replicated log between the latest snapshot and the
high-watermark needed before generating a new snapshot. The default is
20MB.

Both configurations need to be satisfied before generating a new
snapshot. I have updated the KIP.

> 42. One of the reasons that we added the per partition limit is to allow
> each partition to make relatively even progress during the catchup phase.
> This helps kstreams by potentially reducing the gap between stream time
> from different partitions. If we can achieve the same thing without the
> partition limit, it will be fine too. "3. For the remaining partitions send
> at most the average max bytes plus the average remaining bytes." Do we
> guarantee that request level max_bytes can be filled up when it can? Could
> we document how we distribute the request level max_bytes to partitions in
> the KIP?

I want to allow some flexibility in the implementation. How about the
following update to the FetchSnapshot Request Handling section:

3. Send the bytes in the snapshot from Position. If there are multiple
partitions in the FetchSnapshot request, then the leader will evenly
distribute the number of bytes sent across all of the partitions. The
leader will not send more bytes in the response than ResponseMaxBytes,
the minimum of MaxBytes in the request and the value configured in
replica.fetch.response.max.bytes.
  a. Each topic partition is guaranteed to receive at least the
average of ResponseMaxBytes if that snapshot has enough bytes
remaining.
  b. If there are topic partitions with snapshots that have remaining
bytes less than the average ResponseMaxBytes, then those bytes may be
used to send snapshot bytes for other topic partitions.

I should also point out that in practice for KIP-630 this request will
only have one topic partition (__cluster_metadata-0).

I should also point out that FetchSnapshot is sending bytes not
records so there is no requirement that the response must contain at
least one record like Fetch.

>Also, should we change Fetch accordingly?

If we want to make this change I think we should do this in another
KIP. What do you think?

> 46. If we don't have IBP, how do we make sure that a broker doesn't
> issue FetchSnapshotRequest when the receiving broker hasn't been upgraded
> yet?

For a broker to send a FetchSnapshotRequest it means that it received
a FetchResponse that contained a SnapshotId field. For the leader to
send a SnapshotId in the FetchResponse it means that the leader is
executing code that knows how to handle FetchSnapshotRequests.

The inverse is also true. For the follower to receive a SnapshotId for
the FetchResponse it means that it sent the FetchRequest to the leader
of the __cluster_metadata-0 topic partitions. Only the KafkaRaftClient
will send that fetch request.

After writing the above, I see what you are saying. The broker needs
to know if it should enable the KafkaRaftClient and send FetchRequests
to the __cluster_metadata-0 topic partition. I think that there is
also a question of how to perform a rolling migration of a cluster
from ZK to KIP-500. I think we will write a future KIP that documents
this process.

Thanks for your help here. For now, I'll mention that we will bump the
IBP. The new wording for the "Compatibility, Deprecation, and
Migration Plan" section:

This KIP is only implemented for the internal topic
__cluster_metadata. The inter-broker protocol (IBP) will be increased
to indicate that all of the brokers in the cluster support KIP-595 and
KIP-630.

Thanks,
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Guozhang Wang
Thanks for the clarification Jose, that clears my confusions already :)


Guozhang

On Thu, Oct 1, 2020 at 10:51 AM Jose Garcia Sancio 
wrote:

> Thanks for the email Guozhang.
>
> > Thanks for the replies and the KIP updates. Just want to clarify one more
> > thing regarding my previous comment 3): I understand that when a snapshot
> > has completed loading, then we can use it in our handling logic of vote
> > request. And I understand that:
> >
> > 1) Before a snapshot has been completely received (e.g. if we've only
> > received a subset of the "chunks"), then we just handle vote requests "as
> > like" there's no snapshot yet.
> >
> > 2) After a snapshot has been completely received and loaded into main
> > memory, we can handle vote requests "as of" the received snapshot.
> >
> > What I'm wondering if, in between of these two synchronization barriers,
> > after all the snapshot chunks have been received but before it has been
> > completely parsed and loaded into the memory's metadata cache, if we
> > received a request (note they may be handled by different threads, hence
> > concurrently), what should we do? Or are you proposing that the
> > fetchSnapshot request would also be handled in that single-threaded raft
> > client loop so it is in order with all other requests, if that's the case
> > then we do not have any concurrency issues to worry, but then on the
> other
> > hand the reception of the last snapshot chunk and loading them to main
> > memory may also take long time during which a client may not be able to
> > handle any other requests.
>
> Yes. The FetchSnapshot request and response handling will be performed
> by the KafkaRaftClient in a single threaded fashion. The
> KafkaRaftClient doesn't need to load the snapshot to know what state
> it is in. It only needs to scan the "checkpoints" folder, load the
> quorum state file and know the LEO of the replicated log. I would
> modify 2) above to the following:
>
> 3) After the snapshot has been validated by
>   a) Fetching all of the chunks
>   b) Verifying the CRC of the records in the snapshot
>   c) Atomically moving the temporary snapshot to the permanent location
>
> After 3.c), the KafkaRaftClient only needs to scan and parse the
> filenames in the directory called "checkpoints" to find the
> largest/latest permanent snapshot.
>
> As you point out in 1) before 3.c) the KafkaRaftClient, in regards to
> leader election, will behave as if the temporary snapshot didn't
> exists.
>
> The loading of the snapshot will be done by the state machine (Kafka
> Controller or Metadata Cache) and it can perform this on a different
> thread. The KafkaRaftClient will provide an API for finding and
> reading the latest valid snapshot stored locally.
>
> Are you also concerned that the snapshot could have been corrupted after
> 3.c?
>
> I also updated the "Changes to leader Election" section to make this a
> bit clearer.
>
> Thanks,
> Jose
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jun Rao
Hi, Jose,

Thanks for the reply. A few more comments.

41. That's a good point. With compacted topic, the cleaning won't be done
until the active segment rolls. With snapshots, I guess we don't have this
restriction? So, it would be useful to guard against too frequent
snapshotting. Does the new proposal address this completely? If the
snapshot has only 1 record, and the new record keeps updating the same key,
does that still cause the snapshot to be generated frequently?

42. One of the reasons that we added the per partition limit is to allow
each partition to make relatively even progress during the catchup phase.
This helps kstreams by potentially reducing the gap between stream time
from different partitions. If we can achieve the same thing without the
partition limit, it will be fine too. "3. For the remaining partitions send
at most the average max bytes plus the average remaining bytes." Do we
guarantee that request level max_bytes can be filled up when it can? Could
we document how we distribute the request level max_bytes to partitions in
the KIP? Also, should we change Fetch accordingly?

46. If we don't have IBP, how do we make sure that a broker doesn't
issue FetchSnapshotRequest when the receiving broker hasn't been upgraded
yet?

Thanks,

Jun

On Thu, Oct 1, 2020 at 10:09 AM Jose Garcia Sancio 
wrote:

> Thank you for the quick response Jun. Excuse the delayed response but
> I wanted to confirm some things regarding IBP. See comments below.
>
> Here are my changes to the KIP:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=30=28
>
> > 40. LBO: Code wise, logStartOffset is used in so many places like Log,
> > ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am
> not
> > if it's worth renaming in all those places. If the main concern is to
> > avoid confusion, we can just always spell out logStartOffset.
>
> Done. Keeping it as LogStartOffset is better. I was also concerned
> with external tools that may be generating code from the JSON schema.
>
> > 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be
> > empty initially, it's probably better to define the ratio as
> new_data_size
> > / (new_data_size + snapshot_size). This avoids the dividing by zero issue
> > and is also consistent with the cleaner ratio definition for compacted
> > topics.
>
> I am assuming that snapshot_size is the size of the largest snapshot
> on disk, is this correct? If we use this formal then we will generate
> snapshots very quickly if the snapshot on disk is zero or very small.
>
> In general what we care about is if the replicated log has a lot of
> records that delete or update records in the snapshot. I was thinking
> something along the following formula:
>
> (size of delete snapshot records + size of updated records) / (total
> size of snapshot), where total size of snapshot is greater than zero.
> 0, where total size of snapshot is zero
>
> This means that in the extreme case where the replicated log only
> contains "addition" records then we never generate a snapshot. I think
> this is the desired behavior since generating a snapshot will consume
> disk bandwidth without saving disk space. What do you think?
>
> >
> > 42. FetchSnapshotRequest: Since this is designed to fetch more than one
> > partition, it seems it's useful to have a per-partition maxBytes, in
> > addition to the request level maxBytes, just like a Fetch request?
>
> Yeah, we have debated this in another thread from Jason. The argument
> is that MaxBytes at the top level is all that we need if we implement
> the following heuristic:
>
> 1. Compute the average max bytes per partition by dividing the max by
> the number of partitions in the request.
> 2. For all of the partitions with remaining bytes less than this
> average max bytes, then send all of those bytes and sum the remaining
> bytes.
> 3. For the remaining partitions send at most the average max bytes
> plus the average remaining bytes.
>
> Note that this heuristic will only be performed once and not at worst
> N times for N partitions.
>
> What do you think? Besides consistency with Fetch requests, is there
> another reason to have MaxBytes per partition?
>
> > 43. FetchSnapshotResponse:
> > 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is
> > that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE
> seems
> > to suggest that the provided EndOffset is wrong, which is not the
> intention
> > for the error code.
>
> Yeah. Added a new error called POSITION_OUT_OF_RANGE.
>
> > 43.1 Position field seems to be the same as the one in
> > FetchSnapshotRequest. If we have both, should the requester verify the
> > consistency between two values and what should the requester do if the
> two
> > values don't match?
>
> Yeah the Position in the response will be the same value as the
> Position in the request. I was thinking of only verifying Position
> against the state on the temporary 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jose Garcia Sancio
Thanks for the email Guozhang.

> Thanks for the replies and the KIP updates. Just want to clarify one more
> thing regarding my previous comment 3): I understand that when a snapshot
> has completed loading, then we can use it in our handling logic of vote
> request. And I understand that:
>
> 1) Before a snapshot has been completely received (e.g. if we've only
> received a subset of the "chunks"), then we just handle vote requests "as
> like" there's no snapshot yet.
>
> 2) After a snapshot has been completely received and loaded into main
> memory, we can handle vote requests "as of" the received snapshot.
>
> What I'm wondering if, in between of these two synchronization barriers,
> after all the snapshot chunks have been received but before it has been
> completely parsed and loaded into the memory's metadata cache, if we
> received a request (note they may be handled by different threads, hence
> concurrently), what should we do? Or are you proposing that the
> fetchSnapshot request would also be handled in that single-threaded raft
> client loop so it is in order with all other requests, if that's the case
> then we do not have any concurrency issues to worry, but then on the other
> hand the reception of the last snapshot chunk and loading them to main
> memory may also take long time during which a client may not be able to
> handle any other requests.

Yes. The FetchSnapshot request and response handling will be performed
by the KafkaRaftClient in a single threaded fashion. The
KafkaRaftClient doesn't need to load the snapshot to know what state
it is in. It only needs to scan the "checkpoints" folder, load the
quorum state file and know the LEO of the replicated log. I would
modify 2) above to the following:

3) After the snapshot has been validated by
  a) Fetching all of the chunks
  b) Verifying the CRC of the records in the snapshot
  c) Atomically moving the temporary snapshot to the permanent location

After 3.c), the KafkaRaftClient only needs to scan and parse the
filenames in the directory called "checkpoints" to find the
largest/latest permanent snapshot.

As you point out in 1) before 3.c) the KafkaRaftClient, in regards to
leader election, will behave as if the temporary snapshot didn't
exists.

The loading of the snapshot will be done by the state machine (Kafka
Controller or Metadata Cache) and it can perform this on a different
thread. The KafkaRaftClient will provide an API for finding and
reading the latest valid snapshot stored locally.

Are you also concerned that the snapshot could have been corrupted after 3.c?

I also updated the "Changes to leader Election" section to make this a
bit clearer.

Thanks,
Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jose Garcia Sancio
Thank you for the quick response Jun. Excuse the delayed response but
I wanted to confirm some things regarding IBP. See comments below.

Here are my changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=30=28

> 40. LBO: Code wise, logStartOffset is used in so many places like Log,
> ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not
> if it's worth renaming in all those places. If the main concern is to
> avoid confusion, we can just always spell out logStartOffset.

Done. Keeping it as LogStartOffset is better. I was also concerned
with external tools that may be generating code from the JSON schema.

> 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be
> empty initially, it's probably better to define the ratio as new_data_size
> / (new_data_size + snapshot_size). This avoids the dividing by zero issue
> and is also consistent with the cleaner ratio definition for compacted
> topics.

I am assuming that snapshot_size is the size of the largest snapshot
on disk, is this correct? If we use this formal then we will generate
snapshots very quickly if the snapshot on disk is zero or very small.

In general what we care about is if the replicated log has a lot of
records that delete or update records in the snapshot. I was thinking
something along the following formula:

(size of delete snapshot records + size of updated records) / (total
size of snapshot), where total size of snapshot is greater than zero.
0, where total size of snapshot is zero

This means that in the extreme case where the replicated log only
contains "addition" records then we never generate a snapshot. I think
this is the desired behavior since generating a snapshot will consume
disk bandwidth without saving disk space. What do you think?

>
> 42. FetchSnapshotRequest: Since this is designed to fetch more than one
> partition, it seems it's useful to have a per-partition maxBytes, in
> addition to the request level maxBytes, just like a Fetch request?

Yeah, we have debated this in another thread from Jason. The argument
is that MaxBytes at the top level is all that we need if we implement
the following heuristic:

1. Compute the average max bytes per partition by dividing the max by
the number of partitions in the request.
2. For all of the partitions with remaining bytes less than this
average max bytes, then send all of those bytes and sum the remaining
bytes.
3. For the remaining partitions send at most the average max bytes
plus the average remaining bytes.

Note that this heuristic will only be performed once and not at worst
N times for N partitions.

What do you think? Besides consistency with Fetch requests, is there
another reason to have MaxBytes per partition?

> 43. FetchSnapshotResponse:
> 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is
> that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems
> to suggest that the provided EndOffset is wrong, which is not the intention
> for the error code.

Yeah. Added a new error called POSITION_OUT_OF_RANGE.

> 43.1 Position field seems to be the same as the one in
> FetchSnapshotRequest. If we have both, should the requester verify the
> consistency between two values and what should the requester do if the two
> values don't match?

Yeah the Position in the response will be the same value as the
Position in the request. I was thinking of only verifying Position
against the state on the temporary snapshot file on disk. If Position
is not equal to the size of the file then reject the response and send
another FetchSnapshot request.

> 44. metric: Would a metric that captures the lag in offset between the last
> snapshot and the logEndOffset be useful?

Yes. How about the difference between the last snapshot offset and the
high-watermark? Snapshot can only be created up to the high-watermark.

Added this metric. Let me know if you still think we need a metric for
the difference between the largest snapshot end offset and the
high-watermark.

> 45. It seems the KIP assumes that every voter (leader and follower) and
> observer has a local replicated log for __cluster_metadata. It would be
> useful to make that clear in the overview section.

Updated the overview section. I think that this decision affects the
section "Changes to Leader Election". That section should not affect
observers since they don't participate in leader elections. It also
affects the section "Validation of Snapshot and Log" but it should be
possible to fix that section if observers don't have the replicated
log on disk.

> 46. Does this KIP cover upgrading from older versions of Kafka? If so, do
> we need IBP to guard the usage of modified FetchRequest and the new
> FetchSnapshotRequest? If not, could we make it clear that upgrading will be
> covered somewhere else?

In short, I don't think we need to increase the IBP. When we implement
snapshots for other topics like __consumer_offset and 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-30 Thread Guozhang Wang
Hello Jose,

Thanks for the replies and the KIP updates. Just want to clarify one more
thing regarding my previous comment 3): I understand that when a snapshot
has completed loading, then we can use it in our handling logic of vote
request. And I understand that:

1) Before a snapshot has been completely received (e.g. if we've only
received a subset of the "chunks"), then we just handle vote requests "as
like" there's no snapshot yet.

2) After a snapshot has been completely received and loaded into main
memory, we can handle vote requests "as of" the received snapshot.

What I'm wondering if, in between of these two synchronization barriers,
after all the snapshot chunks have been received but before it has been
completely parsed and loaded into the memory's metadata cache, if we
received a request (note they may be handled by different threads, hence
concurrently), what should we do? Or are you proposing that the
fetchSnapshot request would also be handled in that single-threaded raft
client loop so it is in order with all other requests, if that's the case
then we do not have any concurrency issues to worry, but then on the other
hand the reception of the last snapshot chunk and loading them to main
memory may also take long time during which a client may not be able to
handle any other requests.


Guozhang

On Wed, Sep 30, 2020 at 10:57 AM Jun Rao  wrote:

> Hi, Jose,
>
> Thanks for the updated KIP. A few more comments below.
>
> 40. LBO: Code wise, logStartOffset is used in so many places like Log,
> ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not
> if it's worth renaming in all those places. If the main concern is to
> avoid confusion, we can just always spell out logStartOffset.
>
> 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be
> empty initially, it's probably better to define the ratio as new_data_size
> / (new_data_size + snapshot_size). This avoids the dividing by zero issue
> and is also consistent with the cleaner ratio definition for compacted
> topics.
>
> 42. FetchSnapshotRequest: Since this is designed to fetch more than one
> partition, it seems it's useful to have a per-partition maxBytes, in
> addition to the request level maxBytes, just like a Fetch request?
>
> 43. FetchSnapshotResponse:
> 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is
> that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems
> to suggest that the provided EndOffset is wrong, which is not the intention
> for the error code.
> 43.1 Position field seems to be the same as the one in
> FetchSnapshotRequest. If we have both, should the requester verify the
> consistency between two values and what should the requester do if the two
> values don't match?
>
> 44. metric: Would a metric that captures the lag in offset between the last
> snapshot and the logEndOffset be useful?
>
> 45. It seems the KIP assumes that every voter (leader and follower) and
> observer has a local replicated log for __cluster_metadata. It would be
> useful to make that clear in the overview section.
>
> 46. Does this KIP cover upgrading from older versions of Kafka? If so, do
> we need IBP to guard the usage of modified FetchRequest and the new
> FetchSnapshotRequest? If not, could we make it clear that upgrading will be
> covered somewhere else?
>
> Thanks,
>
> Jun
>
> On Mon, Sep 28, 2020 at 9:25 PM Jose Garcia Sancio 
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for your feedback. It was very helpful. See my comments below.
> >
> > Changes to the KIP:
> >
> >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27
> >
> > On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang 
> wrote:
> > >
> > > Hello Jose,
> > >
> > > Thanks for the KIP. Overall it looks great. I have a few meta / minor
> > > question, or maybe just clarifications below:
> > >
> > > Meta:
> > >
> > > 1. I want to clarify that if only the active controller would generate
> > > snapshots, OR would any voter in the quorum would generate snapshots,
> OR
> > > would even observers generate snapshots? Originally I thought it was
> the
> > > latter case, but I think reading through the doc I got confused by some
> > > paragraphs. E.g. you mentioned snapshots are generated by the
> Controller
> > > module, and observers would not have that module.
> >
> > Sorry for the confusion and inconsistency here. Every replica of the
> > cluster metadata topic partition will generate a snapshot. That
> > includes the voters (leader and followers) and observers. In this KIP
> > the leader is the Active Controller, the voters are the Kafka
> > Controllers and the observers are the Metadata Cache.
> >
> > I went through the KIP again and made sure to enumerate both Kafka
> > Controllers and Metadata Cache when talking about snapshot generation
> > and loading.
> >
> > I renamed the new configurations to be prefixed by metadata instead of
> > controller.
> >
> > I moved the terminology 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-30 Thread Jun Rao
Hi, Jose,

Thanks for the updated KIP. A few more comments below.

40. LBO: Code wise, logStartOffset is used in so many places like Log,
ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not
if it's worth renaming in all those places. If the main concern is to
avoid confusion, we can just always spell out logStartOffset.

41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be
empty initially, it's probably better to define the ratio as new_data_size
/ (new_data_size + snapshot_size). This avoids the dividing by zero issue
and is also consistent with the cleaner ratio definition for compacted
topics.

42. FetchSnapshotRequest: Since this is designed to fetch more than one
partition, it seems it's useful to have a per-partition maxBytes, in
addition to the request level maxBytes, just like a Fetch request?

43. FetchSnapshotResponse:
43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is
that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems
to suggest that the provided EndOffset is wrong, which is not the intention
for the error code.
43.1 Position field seems to be the same as the one in
FetchSnapshotRequest. If we have both, should the requester verify the
consistency between two values and what should the requester do if the two
values don't match?

44. metric: Would a metric that captures the lag in offset between the last
snapshot and the logEndOffset be useful?

45. It seems the KIP assumes that every voter (leader and follower) and
observer has a local replicated log for __cluster_metadata. It would be
useful to make that clear in the overview section.

46. Does this KIP cover upgrading from older versions of Kafka? If so, do
we need IBP to guard the usage of modified FetchRequest and the new
FetchSnapshotRequest? If not, could we make it clear that upgrading will be
covered somewhere else?

Thanks,

Jun

On Mon, Sep 28, 2020 at 9:25 PM Jose Garcia Sancio 
wrote:

> Hi Guozhang,
>
> Thanks for your feedback. It was very helpful. See my comments below.
>
> Changes to the KIP:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27
>
> On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang  wrote:
> >
> > Hello Jose,
> >
> > Thanks for the KIP. Overall it looks great. I have a few meta / minor
> > question, or maybe just clarifications below:
> >
> > Meta:
> >
> > 1. I want to clarify that if only the active controller would generate
> > snapshots, OR would any voter in the quorum would generate snapshots, OR
> > would even observers generate snapshots? Originally I thought it was the
> > latter case, but I think reading through the doc I got confused by some
> > paragraphs. E.g. you mentioned snapshots are generated by the Controller
> > module, and observers would not have that module.
>
> Sorry for the confusion and inconsistency here. Every replica of the
> cluster metadata topic partition will generate a snapshot. That
> includes the voters (leader and followers) and observers. In this KIP
> the leader is the Active Controller, the voters are the Kafka
> Controllers and the observers are the Metadata Cache.
>
> I went through the KIP again and made sure to enumerate both Kafka
> Controllers and Metadata Cache when talking about snapshot generation
> and loading.
>
> I renamed the new configurations to be prefixed by metadata instead of
> controller.
>
> I moved the terminology section to the top.
>
> >
> > 2. Following on Jun's previous comment: currently the __consumer_metadata
> > log is replicated on ALL brokers since all voters and observers would
> > replicate that topic. I know this may be out of the scope of this KIP
> but I
> > think maybe only letting the voters to replicate (and periodically
> > truncate) the log while observers only maintain the in-memory state and
> > snapshots is a good trade-off here, assuming snapshot loading is
> relatively
> > fast.
>
> This is a good idea and optimization. It would save a write. I think
> we need to think about the implication to KIP-642, the dynamic quorum
> reassignment KIP, if we end up allowing observers to get "promoted" to
> voters.
>
> >
> > 3. When a raft client is in the middle of loading a snapshot, should it
> > reject any vote / begin-/end-/describe-quorum requests at the time? More
> > generally, while a snapshot is being loaded, how should we treat the
> > current state of the client when handling Raft requests.
>
> Re: requesting votes and granting votes.
>
> In the section "Changes to Leader Election", I think this section was
> improved since your review. I mentioned that the raft client needs to
> look at:
>
> 1. latest/largest snapshot epoch and end offset
> 2. the LEO of the replicated log
>
> The voters should use the latest/largest of these two during the
> election process.
>
> Re: quorum state
>
> For KIP-595 and KIP-630 the snapshot doesn't include any quorum
> information. This may change in KIP-642.
>
> >
> > Minor:
> >
> > 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-28 Thread Jose Garcia Sancio
Hi Guozhang,

Thanks for your feedback. It was very helpful. See my comments below.

Changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27

On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang  wrote:
>
> Hello Jose,
>
> Thanks for the KIP. Overall it looks great. I have a few meta / minor
> question, or maybe just clarifications below:
>
> Meta:
>
> 1. I want to clarify that if only the active controller would generate
> snapshots, OR would any voter in the quorum would generate snapshots, OR
> would even observers generate snapshots? Originally I thought it was the
> latter case, but I think reading through the doc I got confused by some
> paragraphs. E.g. you mentioned snapshots are generated by the Controller
> module, and observers would not have that module.

Sorry for the confusion and inconsistency here. Every replica of the
cluster metadata topic partition will generate a snapshot. That
includes the voters (leader and followers) and observers. In this KIP
the leader is the Active Controller, the voters are the Kafka
Controllers and the observers are the Metadata Cache.

I went through the KIP again and made sure to enumerate both Kafka
Controllers and Metadata Cache when talking about snapshot generation
and loading.

I renamed the new configurations to be prefixed by metadata instead of
controller.

I moved the terminology section to the top.

>
> 2. Following on Jun's previous comment: currently the __consumer_metadata
> log is replicated on ALL brokers since all voters and observers would
> replicate that topic. I know this may be out of the scope of this KIP but I
> think maybe only letting the voters to replicate (and periodically
> truncate) the log while observers only maintain the in-memory state and
> snapshots is a good trade-off here, assuming snapshot loading is relatively
> fast.

This is a good idea and optimization. It would save a write. I think
we need to think about the implication to KIP-642, the dynamic quorum
reassignment KIP, if we end up allowing observers to get "promoted" to
voters.

>
> 3. When a raft client is in the middle of loading a snapshot, should it
> reject any vote / begin-/end-/describe-quorum requests at the time? More
> generally, while a snapshot is being loaded, how should we treat the
> current state of the client when handling Raft requests.

Re: requesting votes and granting votes.

In the section "Changes to Leader Election", I think this section was
improved since your review. I mentioned that the raft client needs to
look at:

1. latest/largest snapshot epoch and end offset
2. the LEO of the replicated log

The voters should use the latest/largest of these two during the
election process.

Re: quorum state

For KIP-595 and KIP-630 the snapshot doesn't include any quorum
information. This may change in KIP-642.

>
> Minor:
>
> 4."All of the live replicas (followers and observers) have replicated LBO".
> Today the raft layer does not yet maintain LBO across all replicas, is this
> information kept in the controller layer? I'm asking because I do not see
> relevant docs in KIP-631 and hence a bit confused which layer is
> responsible for bookkeeping the LBOs of all replicas.

This is not minor! :). This should be done in the raft client as part
of the fetch protocol. Note that LBO is just a rename of log start
offset. If the current raft implementation doesn't manage this
information then we will have to implement this as part of
implementing this KIP (KIP-630).

> 5. "Followers and observers will increase their log begin offset to the
> value sent on the fetch response as long as the local Kafka Controller and
> Metadata Cache has generated a snapshot with an end offset greater than or
> equal to the new log begin offset." Not sure I follow this: 1) If observers
> do not generate snapshots since they do not have a Controller module on
> them, then it is possible that observers do not have any snapshots at all
> if they do not get one from the leader, in that case they would never
> truncate the logs locally;

Observers will have a Metadata Cache which will be responsible for
generating snapshots.

> 2) could you clarify on "value sent on the fetch
> response", are you referring to the "HighWatermark", or "LogStartOffset" in
> the schema, or some other fields?

The log begin offset is the same as the log start offset. This KIP
renames that field in the fetch response. I am starting to think that
renaming this field in this KIP is not worth it. What do you think?

>
> 6. The term "SnapshotId" is introduced without definition at first. My
> understanding is it's defined as a combo of , could you
> clarify if this is the case?

Good point. I added this sentence to the Snapshot Format section and
terminology section:

"Each snapshot is uniquely identified by a SnapshotId, the epoch and
end offset of the records in the replicated log included in the
snapshot."

> BTW I think the term "endOffset" is a term
> 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-28 Thread Jose Garcia Sancio
Thanks for the reply Jun. Some comments below.

Here are the changes:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=27=26

> 20. Good point on metadata cache. I think we need to make a decision
> consistently. For example, if we decide that dedicated voter nodes don't
> serve metadata requests, then we don't need to expose the voters host/port
> to the client. Which KIP should make this decision?

Makes sense. My opinion is that this should be addressed in KIP-631
since I think exposing this information is independent of
snapshotting.

Note that I think there is a long term goal to make the
__cluster_metadata topic partition readable by a Kafka Consumer but we
can address that in a future KIP.

> 31. controller.snapshot.minimum.records: For a compacted topic, we use a
> ratio instead of the number of records to determine when to compact. This
> has some advantages. For example, if we use
> controller.snapshot.minimum.records and set it to 1000, then it will
> trigger the generation of a new snapshot when the existing snapshot is
> either 10MB or 1GB. Intuitively, the larger the snapshot, the more
> expensive it is to write to disk. So, we want to wait for more data to be
> accumulated before generating the next snapshot. The ratio based setting
> achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be
> accumulated to regenerate a 10MB/1GB snapshot respectively.

I agree. I proposed using a simple algorithm like
"controller.snapshot.minimum.records" since calculating a dirty ratio
may not be straightforward when replicated log records don't map 1:1
to snapshot records. But I think we can implement a heuristic for
this. There is a small complication when generating the first snapshot
but it should be implementable. Here is the latest wording of the
"When to Snapshot" section:

If the Kafka Controller generates a snapshot too frequently then it
can negatively affect the performance of the disk. If it doesn't
generate a snapshot often enough then it can increase the amount of
time it takes to load its state into memory and it can increase the
amount space taken up by the replicated log. The Kafka Controller will
have a new configuration option
controller.snapshot.min.cleanable.ratio.  If the number of snapshot
records that have changed (deleted or modified) between the latest
snapshot and the current in-memory state divided by the total number
of snapshot records is greater than
controller.snapshot.min.cleanable.ratio, then the Kafka Controller
will perform a new snapshot.

Note that new snapshot records don't count against this ratio. If a
new snapshot record was added since that last snapshot then it doesn't
affect the dirty ratio. If a snapshot record was added and then
modified or deleted then it counts against the dirty ratio.

> 32. max.replication.lag.ms: It seems this is specific to the metadata
> topic. Could we make that clear in the name?

Good catch. Done.


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-28 Thread Jose Garcia Sancio
Thanks Jason. Some comments below.

> > Generally the number of snapshots on disk will be one. I suspect that
> users will want some control over this. We can add a configuration
> option that doesn't delete, or advances the log begin offset past, the
> N latest snapshots. We can set the default value for this
> configuration to two. What do you think?
>
> I know Zookeeper has a config like this, but I'm not sure how frequently it
> is used. I would probably suggest we pick a good number of snapshots (maybe
> just 1-2) and leave it out of the configs.
>

Sounds good to me. If followers/observers are keeping up with the
Leader, I think the description in section "When to Increase the Log
Begin Offset" will lead to one snapshot on disk in the steady state.

> > We could use the same configuration we have for Fetch but to avoid
> confusion let's add two more configurations for
> "replica.fetch.snapshot.max.bytes" and
> "replica.fetch.snapshot.response.max.bytes".
>
> My vote would probably be to reuse the existing configs. We can add new
> configs in the future if the need emerges, but I can't think of a good
> reason why a user would want these to be different.

Sounds good to me. Removed the configuration from the KIP. Updated the
FetchSnapshot request handling section to mention that the
replica.fetch.response.max.bytes configuration will be used.

> By the way, it looks like the FetchSnapshot schema now has both a partition
> level and a top level max bytes. Do we need both?

Kepted the top level MaxBytes and remove the topic partition level MaxBytes.

> > The snapshot epoch will be used when ordering snapshots and more
> importantly when setting the LastFetchedEpoch in the Fetch request. It
> is possible for a follower to have a snapshot and an empty log. In
> this case the follower will use the epoch of the snapshot when setting
> the LastFetchEpoch in the Fetch request.
>
> Just to be clear, I think it is important to include the snapshot epoch so
> that we /can/ reason about the snapshot state in the presence of data loss.
> However, if we excluded data loss, then this would strictly speaking be
> unnecessary because a snapshot offset would always be uniquely defined
> (since we do not snapshot above the high watermark). Hence it would be safe
> to leave LastFetchedEpoch undefined. Anyway, I think we're on the same page
> about the behavior, just thought it might be useful to clarify the
> reasoning.

Okay. Even though you are correct that the LastFetchEpoch shouldn't
matter since the follower is fetching committed data. I still think
that the follower should send the epoch of the snapshot for the
LastFetchedEpoch for extra validation on the leader. What do you
think?


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-27 Thread Guozhang Wang
Hello Jose,

Thanks for the KIP. Overall it looks great. I have a few meta / minor
question, or maybe just clarifications below:

Meta:

1. I want to clarify that if only the active controller would generate
snapshots, OR would any voter in the quorum would generate snapshots, OR
would even observers generate snapshots? Originally I thought it was the
latter case, but I think reading through the doc I got confused by some
paragraphs. E.g. you mentioned snapshots are generated by the Controller
module, and observers would not have that module.

2. Following on Jun's previous comment: currently the __consumer_metadata
log is replicated on ALL brokers since all voters and observers would
replicate that topic. I know this may be out of the scope of this KIP but I
think maybe only letting the voters to replicate (and periodically
truncate) the log while observers only maintain the in-memory state and
snapshots is a good trade-off here, assuming snapshot loading is relatively
fast.

3. When a raft client is in the middle of loading a snapshot, should it
reject any vote / begin-/end-/describe-quorum requests at the time? More
generally, while a snapshot is being loaded, how should we treat the
current state of the client when handling Raft requests.

Minor:

4."All of the live replicas (followers and observers) have replicated LBO".
Today the raft layer does not yet maintain LBO across all replicas, is this
information kept in the controller layer? I'm asking because I do not see
relevant docs in KIP-631 and hence a bit confused which layer is
responsible for bookkeeping the LBOs of all replicas.

5. "Followers and observers will increase their log begin offset to the
value sent on the fetch response as long as the local Kafka Controller and
Metadata Cache has generated a snapshot with an end offset greater than or
equal to the new log begin offset." Not sure I follow this: 1) If observers
do not generate snapshots since they do not have a Controller module on
them, then it is possible that observers do not have any snapshots at all
if they do not get one from the leader, in that case they would never
truncate the logs locally; 2) could you clarify on "value sent on the fetch
response", are you referring to the "HighWatermark", or "LogStartOffset" in
the schema, or some other fields?

6. The term "SnapshotId" is introduced without definition at first. My
understanding is it's defined as a combo of , could you
clarify if this is the case? BTW I think the term "endOffset" is a term
used per log, and maybe calling the part of the SnapshotId "nextOffset" is
better since that offset is likely already filled with a record.

7. This is a very nit one: "If the latest snapshot has an epoch E and end
offset O and is it newer than the LEO of the replicated log, then the
replica must set the LBO and LEO to O." On wiki `O` and `0` looks very much
the same  and that confused me a couple of times... I'd suggest we phrase
any of such occussions to "an epoch e1 and offset o1". Also for LEO since
we would not really know what would be its epoch (since it may be bumped)
when comparing we only care about the offset and not about the epoch right?
If yes, please clarify that in the doc as well.

8. "LEO - log end offset - the largest offset and epoch that has been
written to disk." I think LEO is the "next" offset to be written to the log
right? Also it seems consistent with your diagrams.

9. "... will send a vote request and response as if they had an empty log."
Not sure I completely follow this, do you mean that they will set
"LastOffsetEpoch/LastOffset" as "-1/0" when sending a vote request, and
upon receiving a vote request it would compare the
request's "LastOffsetEpoch/LastOffset" with "-1/0" as well?

10. In the FetchSnapshot response schema, just to clarify the "Position" :
"The byte position within the snapshot." is referring to the starting byte
position of the returned snapshot data, right?


Thanks,
Guozhang

On Fri, Sep 25, 2020 at 4:42 PM Jun Rao  wrote:

> Hi, Jose,
>
> Thanks for the reply. A few more comments below.
>
> 20. Good point on metadata cache. I think we need to make a decision
> consistently. For example, if we decide that dedicated voter nodes don't
> serve metadata requests, then we don't need to expose the voters host/port
> to the client. Which KIP should make this decision?
>
> 31. controller.snapshot.minimum.records: For a compacted topic, we use a
> ratio instead of the number of records to determine when to compact. This
> has some advantages. For example, if we use
> controller.snapshot.minimum.records and set it to 1000, then it will
> trigger the generation of a new snapshot when the existing snapshot is
> either 10MB or 1GB. Intuitively, the larger the snapshot, the more
> expensive it is to write to disk. So, we want to wait for more data to be
> accumulated before generating the next snapshot. The ratio based setting
> achieves this. For instance, a 50% ratio requires 10MB/1GB more data to 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-25 Thread Jun Rao
Hi, Jose,

Thanks for the reply. A few more comments below.

20. Good point on metadata cache. I think we need to make a decision
consistently. For example, if we decide that dedicated voter nodes don't
serve metadata requests, then we don't need to expose the voters host/port
to the client. Which KIP should make this decision?

31. controller.snapshot.minimum.records: For a compacted topic, we use a
ratio instead of the number of records to determine when to compact. This
has some advantages. For example, if we use
controller.snapshot.minimum.records and set it to 1000, then it will
trigger the generation of a new snapshot when the existing snapshot is
either 10MB or 1GB. Intuitively, the larger the snapshot, the more
expensive it is to write to disk. So, we want to wait for more data to be
accumulated before generating the next snapshot. The ratio based setting
achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be
accumulated to regenerate a 10MB/1GB snapshot respectively.

32. max.replication.lag.ms: It seems this is specific to the metadata
topic. Could we make that clear in the name?

Thanks,

Jun

On Fri, Sep 25, 2020 at 12:43 PM Jose Garcia Sancio 
wrote:

> Thanks for the detailed feedback Jun.
>
> The changes are here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=25=24
>
> Here is a summary of the change to the KIP:
> 1. Use end offset for snapshot and snapshot id.
> 2. Include default for all of the new configuration options.
> 3. Provide more detail in the response handling for FetchSnapshot
>
> > 20. "Metadata Cache: The component that generates snapshots, reads
> > snapshots and reads logs for observer replicas of the topic partition
> > __cluster_metadata." It seems this is needed on every broker, not just
> > observers?
>
> Yes. I think we need some clarification and consensus here. Some
> people are advocating for Kafka brokers to only be observers and would
> only contain a Metadata Cache. With the Kafka Controllers being
> separate nodes that are voters (follower, candidate or leader) and not
> observers. Others are advocating for Kafka Brokers to be able to host
> both the Kafka Controller and the Metadata Cache. In this case if the
> Controller and Metadata Cache are sharing the same underlying topic
> partition then we need to make sure that we unify the snapshotting
> logic.
>
> I would like to be able to unify the in-memory state for both the
> Kafka Controller and the Metadata Cache so that we can share the same
> replicated log and snapshot.
>
> > 21. Our current convention is to use exclusive offset for naming
> > checkpoint files. For example, a producer snapshot file of 1234.snapshot
> > means that the file includes the producer state up to, but not including
> > offset 1234. So, we probably want to follow the same convention for the
> new
> > checkpoint file.
>
> Thanks for pointing this out. This sounds good to me. This was a
> detail that I was struggling with when reading the replication code.
> Updated the KIP. Wherever the offset is exclusive, I renamed it to
> "end offset" (EndOffset).
>
> > 22. Snapshot Format: KIP-631 only defines the format for individual
> > records. It seems that we need to define the container format here. For
> > example, we need to store the length of each record. Also, does the
> > snapshot file need a CRC field?
>
> Yes. I have added more information on this. In summary, we are going
> to use Kafka's log format version 2. This will give us support for
> compression and CRC at the record batch level. The Kafka Controller
> and Metadata Cache can control how big they want the batches to be.
>
> > 23. Could we provide the default value for the new
> > configs controller.snapshot.minimum.records and max.replication.lag.ms.
> > Also, max.replication.lag.ms seems to just control the snapshot
> frequency
> > by time and not directly relate to replication. So, maybe it should be
> > called sth like controller.snapshot.minimum.interval.ms?
>
> "max.replication.lag.ms" is very similar to "replica.lag.time.max.ms".
> Kafka uses "replica.lag.time.max.ms" to make progress on the
> high-watermark when replicas are slow or offline. We want to use
> "max.replication.lag.ms" to make progress on the LBO when replicas are
> slow or offline. These very similar names are confusing. How about
> "replica.lbo.lag.time.max.ms"?
>
> How often snapshotting will happen is determined by
> "controller.snapshot.minimum.records".
>
> > 24. "Kafka allows the clients to delete records that are less than a
> given
> > offset by using the DeleteRecords RPC . Those requests will be validated
> > using the same logic enumerated above." Hmm, should we allow deleteRecord
> > on the metadata topic? If we do, does it trim the snapshot accordingly?
>
> Yeah. After thinking about it some more, I don't think we shouldn't
> allow DeleteRecords to succeed on the __cluster_metadata partition.
> For the error that we return it 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-25 Thread Jose Garcia Sancio
Thanks for the detailed feedback Jun.

The changes are here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=25=24

Here is a summary of the change to the KIP:
1. Use end offset for snapshot and snapshot id.
2. Include default for all of the new configuration options.
3. Provide more detail in the response handling for FetchSnapshot

> 20. "Metadata Cache: The component that generates snapshots, reads
> snapshots and reads logs for observer replicas of the topic partition
> __cluster_metadata." It seems this is needed on every broker, not just
> observers?

Yes. I think we need some clarification and consensus here. Some
people are advocating for Kafka brokers to only be observers and would
only contain a Metadata Cache. With the Kafka Controllers being
separate nodes that are voters (follower, candidate or leader) and not
observers. Others are advocating for Kafka Brokers to be able to host
both the Kafka Controller and the Metadata Cache. In this case if the
Controller and Metadata Cache are sharing the same underlying topic
partition then we need to make sure that we unify the snapshotting
logic.

I would like to be able to unify the in-memory state for both the
Kafka Controller and the Metadata Cache so that we can share the same
replicated log and snapshot.

> 21. Our current convention is to use exclusive offset for naming
> checkpoint files. For example, a producer snapshot file of 1234.snapshot
> means that the file includes the producer state up to, but not including
> offset 1234. So, we probably want to follow the same convention for the new
> checkpoint file.

Thanks for pointing this out. This sounds good to me. This was a
detail that I was struggling with when reading the replication code.
Updated the KIP. Wherever the offset is exclusive, I renamed it to
"end offset" (EndOffset).

> 22. Snapshot Format: KIP-631 only defines the format for individual
> records. It seems that we need to define the container format here. For
> example, we need to store the length of each record. Also, does the
> snapshot file need a CRC field?

Yes. I have added more information on this. In summary, we are going
to use Kafka's log format version 2. This will give us support for
compression and CRC at the record batch level. The Kafka Controller
and Metadata Cache can control how big they want the batches to be.

> 23. Could we provide the default value for the new
> configs controller.snapshot.minimum.records and max.replication.lag.ms.
> Also, max.replication.lag.ms seems to just control the snapshot frequency
> by time and not directly relate to replication. So, maybe it should be
> called sth like controller.snapshot.minimum.interval.ms?

"max.replication.lag.ms" is very similar to "replica.lag.time.max.ms".
Kafka uses "replica.lag.time.max.ms" to make progress on the
high-watermark when replicas are slow or offline. We want to use
"max.replication.lag.ms" to make progress on the LBO when replicas are
slow or offline. These very similar names are confusing. How about
"replica.lbo.lag.time.max.ms"?

How often snapshotting will happen is determined by
"controller.snapshot.minimum.records".

> 24. "Kafka allows the clients to delete records that are less than a given
> offset by using the DeleteRecords RPC . Those requests will be validated
> using the same logic enumerated above." Hmm, should we allow deleteRecord
> on the metadata topic? If we do, does it trim the snapshot accordingly?

Yeah. After thinking about it some more, I don't think we shouldn't
allow DeleteRecords to succeed on the __cluster_metadata partition.
For the error that we return it looks like our options are the
existing "POLICY_VIOLATIOIN" (the description for this error is
"Request parameters do not satisfy the configured policy.') or
introduce a new error. I think we should just return
POLICY_VIOLATIOIN, what do you think?

> 25. "The followers of the __cluster_metadata topic partition will
> concurrently fetch the snapshot and replicated log. This means that
> candidates with incomplete snapshots will send a vote request with a
> LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the
> replicated log." My understanding is that a follower will either fetch from
> the snapshot or the log, but not both at the same time. Could you explain
> how the concurrent part works? Also, what's an incomplete snapshot?

Yes. I rewrote this section based on your comment and Jason's
comments. Let me know if this addresses your concerns.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-630:+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ChangestoLeaderElection

>
> 26. FetchRequest:
> 26.1 Handling Fetch Request: I agree with Jason that SnapshotOffsetAndEpoch
> already tells us the next offset to fetch. So, we don't need to
> set NextOffsetAndEpoch in the response.

Agreed. The response will set one or the other. If SnapshotId (field
renamed in the latest version of the KIP) is set then the 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-24 Thread Jason Gustafson
Thanks Jose. Makes sense overall. A few specific responses below:

> Generally the number of snapshots on disk will be one. I suspect that
users will want some control over this. We can add a configuration
option that doesn't delete, or advances the log begin offset past, the
N latest snapshots. We can set the default value for this
configuration to two. What do you think?

I know Zookeeper has a config like this, but I'm not sure how frequently it
is used. I would probably suggest we pick a good number of snapshots (maybe
just 1-2) and leave it out of the configs.

> We could use the same configuration we have for Fetch but to avoid
confusion let's add two more configurations for
"replica.fetch.snapshot.max.bytes" and
"replica.fetch.snapshot.response.max.bytes".

My vote would probably be to reuse the existing configs. We can add new
configs in the future if the need emerges, but I can't think of a good
reason why a user would want these to be different.

By the way, it looks like the FetchSnapshot schema now has both a partition
level and a top level max bytes. Do we need both?

> The snapshot epoch will be used when ordering snapshots and more
importantly when setting the LastFetchedEpoch in the Fetch request. It
is possible for a follower to have a snapshot and an empty log. In
this case the follower will use the epoch of the snapshot when setting
the LastFetchEpoch in the Fetch request.

Just to be clear, I think it is important to include the snapshot epoch so
that we /can/ reason about the snapshot state in the presence of data loss.
However, if we excluded data loss, then this would strictly speaking be
unnecessary because a snapshot offset would always be uniquely defined
(since we do not snapshot above the high watermark). Hence it would be safe
to leave LastFetchedEpoch undefined. Anyway, I think we're on the same page
about the behavior, just thought it might be useful to clarify the
reasoning.

Thanks,
Jason



On Thu, Sep 24, 2020 at 1:19 PM Jose Garcia Sancio 
wrote:

> Thanks for the feedback Jason.
>
> I have made the following changes to the KIP:
> 1. Better explanation of how followers will manage snapshots and the
> replicated log. This includes the necessary changes when granting or
> requesting votes.
> 2. How the Snapshot's epoch will be used for the LastFetchEpoch in the
> Fetch request.
> 3. New configuration options.
> 4. Changed the Fetch response to match the latest changes in KIP-595.
> 5. Changed the FetchSnapshot request to include total response max bytes.
> 6. Changed the FetchSnapshot response to return the snapshot size
> instead of the "Continue" field.
>
> Diff:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=24=23
>
> > 1. There is a comment in the proposal which suggests that we will
> maintain
> > multiple snapshots:
> >
> > > Having multiple snapshots is useful for minimizing re-fetching of the
> > snapshot when a new snapshot is generated.
> >
> > However, the document later says that snapshots get deleted as the LBO
> > advances. Just wanted to clarify the intent. Will we generally only have
> > one snapshot?
>
> Generally the number of snapshots on disk will be one. I suspect that
> users will want some control over this. We can add a configuration
> option that doesn't delete, or advances the log begin offset past, the
> N latest snapshots. We can set the default value for this
> configuration to two. What do you think?
>
> > 2. The proposal says the following:
> >
> > > During leader election, followers with incomplete or missing snapshot
> > will send a vote request and response as if they had an empty log.
> >
> > Maybe you can help me understand the scenario we're talking about since
> I'm
> > not sure I understand the point of this. If the intent is to not allow
> such
> > a follower to become leader, why would it ever become a candidate? On the
> > other hand, if the intent is to still allow it to become leader in some
> > disaster scenario, then why would it not use its latest log state? For
> > inbound Vote requests, I think it should definitely still consider its
> > latest log state when deciding whether to grant a vote.
>
> Conceptually followers will implement this algorithm:
> 1. Follower sends fetch request
> 2. Leader replies with snapshot epoch and offset
> 3. Follower pauses fetch
> 4. Follower fetches the snapshot
> 5. Follower resume fetch by
> A. Setting the LBO to the snapshot offset plus one
> B. Setting the LEO or fetch offset in the fetch request to the
> snapshot offset plus one
> C. Uses the snapshot epoch as the last fetched epoch in the fetch
> request.
>
> The problem I was trying to address is what is the state of the
> follower between bullet 4 and 5? Let's assume that the snapshot fetch
> in bullet 4 has an epoch of E and an offset of O. The follower can
> have the following state on disk after bullet 4:
>
> 1. A snapshot with offset O and epoch E.
> 2. Many snapshots 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-24 Thread Jose Garcia Sancio
Thanks for the feedback Jason.

I have made the following changes to the KIP:
1. Better explanation of how followers will manage snapshots and the
replicated log. This includes the necessary changes when granting or
requesting votes.
2. How the Snapshot's epoch will be used for the LastFetchEpoch in the
Fetch request.
3. New configuration options.
4. Changed the Fetch response to match the latest changes in KIP-595.
5. Changed the FetchSnapshot request to include total response max bytes.
6. Changed the FetchSnapshot response to return the snapshot size
instead of the "Continue" field.

Diff:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=24=23

> 1. There is a comment in the proposal which suggests that we will maintain
> multiple snapshots:
>
> > Having multiple snapshots is useful for minimizing re-fetching of the
> snapshot when a new snapshot is generated.
>
> However, the document later says that snapshots get deleted as the LBO
> advances. Just wanted to clarify the intent. Will we generally only have
> one snapshot?

Generally the number of snapshots on disk will be one. I suspect that
users will want some control over this. We can add a configuration
option that doesn't delete, or advances the log begin offset past, the
N latest snapshots. We can set the default value for this
configuration to two. What do you think?

> 2. The proposal says the following:
>
> > During leader election, followers with incomplete or missing snapshot
> will send a vote request and response as if they had an empty log.
>
> Maybe you can help me understand the scenario we're talking about since I'm
> not sure I understand the point of this. If the intent is to not allow such
> a follower to become leader, why would it ever become a candidate? On the
> other hand, if the intent is to still allow it to become leader in some
> disaster scenario, then why would it not use its latest log state? For
> inbound Vote requests, I think it should definitely still consider its
> latest log state when deciding whether to grant a vote.

Conceptually followers will implement this algorithm:
1. Follower sends fetch request
2. Leader replies with snapshot epoch and offset
3. Follower pauses fetch
4. Follower fetches the snapshot
5. Follower resume fetch by
A. Setting the LBO to the snapshot offset plus one
B. Setting the LEO or fetch offset in the fetch request to the
snapshot offset plus one
C. Uses the snapshot epoch as the last fetched epoch in the fetch request.

The problem I was trying to address is what is the state of the
follower between bullet 4 and 5? Let's assume that the snapshot fetch
in bullet 4 has an epoch of E and an offset of O. The follower can
have the following state on disk after bullet 4:

1. A snapshot with offset O and epoch E.
2. Many snapshots older/less than offset O and epoch E.
3. A replicated log with LEO older/less than offset O and epoch E.

In this case when the follower grants a vote or becomes a candidate it
should use the latest of all of this which is (1.) the fetched
snapshot with offset O and epoch E.

I updated the KIP to include this description.

> 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as
> well? It looks like we are specifying this at the partition level, but it
> might be more useful to track the maximum bytes at the request level. On a
> related note, it might be useful to think through heuristics for balancing
> between the requests in a partition. Unlike fetches, it seems like we'd
> want to complete snapshot loading partition by partition. I wonder if it
> would be simpler for FetchSnapshot to handle just one partition.

We could use the same configuration we have for Fetch but to avoid
confusion let's add two more configurations for
"replica.fetch.snapshot.max.bytes" and
"replica.fetch.snapshot.response.max.bytes".

> 4. It would help if the document motivated the need to track the snapshot
> epoch. Since we are only snapshotting below the high watermark, are you
> thinking about recovering from data loss scenarios?

I added the following paragraph to the KIP:

The snapshot epoch will be used when ordering snapshots and more
importantly when setting the LastFetchedEpoch in the Fetch request. It
is possible for a follower to have a snapshot and an empty log. In
this case the follower will use the epoch of the snapshot when setting
the LastFetchEpoch in the Fetch request.

>
> 5. Might need to fix the following:
>
> > Otherwise, the leader will respond with the offset and epoch of the
> latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d)
>
> We ended up renaming the next fetch offset and epoch. I think we should
> just leave it empty in this case. The snapshot offset and epoch seem
> sufficient.

Done. I made some changes to the "Handling Fetch Response" section too.


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-22 Thread Jun Rao
Hi, Jose,

Thanks for the updated KIP. A few more comments below.

20. "Metadata Cache: The component that generates snapshots, reads
snapshots and reads logs for observer replicas of the topic partition
__cluster_metadata." It seems this is needed on every broker, not just
observers?

21. Our current convention is to use exclusive offset for naming
checkpoint files. For example, a producer snapshot file of 1234.snapshot
means that the file includes the producer state up to, but not including
offset 1234. So, we probably want to follow the same convention for the new
checkpoint file.

22. Snapshot Format: KIP-631 only defines the format for individual
records. It seems that we need to define the container format here. For
example, we need to store the length of each record. Also, does the
snapshot file need a CRC field?

23. Could we provide the default value for the new
configs controller.snapshot.minimum.records and max.replication.lag.ms.
Also, max.replication.lag.ms seems to just control the snapshot frequency
by time and not directly relate to replication. So, maybe it should be
called sth like controller.snapshot.minimum.interval.ms?

24. "Kafka allows the clients to delete records that are less than a given
offset by using the DeleteRecords RPC . Those requests will be validated
using the same logic enumerated above." Hmm, should we allow deleteRecord
on the metadata topic? If we do, does it trim the snapshot accordingly?

25. "The followers of the __cluster_metadata topic partition will
concurrently fetch the snapshot and replicated log. This means that
candidates with incomplete snapshots will send a vote request with a
LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the
replicated log." My understanding is that a follower will either fetch from
the snapshot or the log, but not both at the same time. Could you explain
how the concurrent part works? Also, what's an incomplete snapshot?

26. FetchRequest:
26.1 Handling Fetch Request: I agree with Jason that SnapshotOffsetAndEpoch
already tells us the next offset to fetch. So, we don't need to
set NextOffsetAndEpoch in the response.
26.2 Is there a reason to rename LogStartOffset to LogBeginOffset? I am not
sure if they are truly identical semantically. For example, currently, the
follower moves it's logStartOffset based on the leader's. Will we do the
same thing with LogBeginOffset?

27. FetchSnapshotRequest: It seems that SnapshotOffsetAndEpoch shouldn't be
optional. Also, its version number 12 is incorrect.

28. FetchSnapshotResponse: Do we need the position field? It seems it's the
same as in the request.

29. "OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is
greater than the size of the snapshot." By offset, do you mean position?

30. It's possible for a broker to die while copying the snapshot file from
the leader or saving its locally generated snapshot. On restart, how does
the broker know whether a local snapshot file is complete or not?

Thanks,

Jun

On Fri, Sep 18, 2020 at 1:38 PM Jason Gustafson  wrote:

> Hi Jose,
>
> A few comments/questions below:
>
> 1. There is a comment in the proposal which suggests that we will maintain
> multiple snapshots:
>
> > Having multiple snapshots is useful for minimizing re-fetching of the
> snapshot when a new snapshot is generated.
>
> However, the document later says that snapshots get deleted as the LBO
> advances. Just wanted to clarify the intent. Will we generally only have
> one snapshot?
>
> 2. The proposal says the following:
>
> > During leader election, followers with incomplete or missing snapshot
> will send a vote request and response as if they had an empty log.
>
> Maybe you can help me understand the scenario we're talking about since I'm
> not sure I understand the point of this. If the intent is to not allow such
> a follower to become leader, why would it ever become a candidate? On the
> other hand, if the intent is to still allow it to become leader in some
> disaster scenario, then why would it not use its latest log state? For
> inbound Vote requests, I think it should definitely still consider its
> latest log state when deciding whether to grant a vote.
>
> 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as
> well? It looks like we are specifying this at the partition level, but it
> might be more useful to track the maximum bytes at the request level. On a
> related note, it might be useful to think through heuristics for balancing
> between the requests in a partition. Unlike fetches, it seems like we'd
> want to complete snapshot loading partition by partition. I wonder if it
> would be simpler for FetchSnapshot to handle just one partition.
>
> 4. It would help if the document motivated the need to track the snapshot
> epoch. Since we are only snapshotting below the high watermark, are you
> thinking about recovering from data loss scenarios?
>
> 5. Might need to fix the following:
>
> > Otherwise, the leader will 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-18 Thread Jason Gustafson
Hi Jose,

A few comments/questions below:

1. There is a comment in the proposal which suggests that we will maintain
multiple snapshots:

> Having multiple snapshots is useful for minimizing re-fetching of the
snapshot when a new snapshot is generated.

However, the document later says that snapshots get deleted as the LBO
advances. Just wanted to clarify the intent. Will we generally only have
one snapshot?

2. The proposal says the following:

> During leader election, followers with incomplete or missing snapshot
will send a vote request and response as if they had an empty log.

Maybe you can help me understand the scenario we're talking about since I'm
not sure I understand the point of this. If the intent is to not allow such
a follower to become leader, why would it ever become a candidate? On the
other hand, if the intent is to still allow it to become leader in some
disaster scenario, then why would it not use its latest log state? For
inbound Vote requests, I think it should definitely still consider its
latest log state when deciding whether to grant a vote.

3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as
well? It looks like we are specifying this at the partition level, but it
might be more useful to track the maximum bytes at the request level. On a
related note, it might be useful to think through heuristics for balancing
between the requests in a partition. Unlike fetches, it seems like we'd
want to complete snapshot loading partition by partition. I wonder if it
would be simpler for FetchSnapshot to handle just one partition.

4. It would help if the document motivated the need to track the snapshot
epoch. Since we are only snapshotting below the high watermark, are you
thinking about recovering from data loss scenarios?

5. Might need to fix the following:

> Otherwise, the leader will respond with the offset and epoch of the
latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d)

We ended up renaming the next fetch offset and epoch. I think we should
just leave it empty in this case. The snapshot offset and epoch seem
sufficient.


Thanks,
Jason

On Fri, Aug 7, 2020 at 11:33 AM Jose Garcia Sancio 
wrote:

> Thanks for your feedback Jun.
>
> Here are my changes to the KIP:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=21=20
>
> My comments are below...
>
> On Wed, Aug 5, 2020 at 1:59 PM Jun Rao  wrote:
> >
> > Hi, Jose,
> >
> > Thanks for the KIP. A few comments blow.
> >
> > 10. I agree with Jason that it's useful to document the motivation a bit
> > clearer. Regarding semantic/performance, one benefit of snapshotting is
> > that it allows changes to be encoded incrementally instead of using the
> > full post image. For example, in KIP-631, each partition has multiple
> > fields like assigned replicas, leader, epoch, isr, etc. If only isr is
> > changed, the snapshotting approach allows the change to be represented
> with
> > just the new value in isr. Compaction will require all existing fields to
> > be included in order to represent just an isr change. This is just
> because
> > we can customize the combining logic with snapshotting.
> >
>
> Yes. Right now the IsrChange record from KIP-631 has both the ISR and
> the leader and epoch. I think we can split this record into two
> records:
> 1. ISR change that includes the topic, partition, and isr.
> 2. Leader change that includes the topic, partition, leader and leader
> epoch.
> I'll bring this up in the discussion thread for that KIP.
>
> > As for the
> > performance benefit, I guess in theory snapshotting allows the snapshot
> to
> > be updated in-place incrementally without having to read the full state
> in
> > the snapshot. BTW, during compaction, we only read the cleaned data once
> > instead of 3 times.
> >
>
> Doesn't compaction need to read the clean records to compare if the
> key is in the map of keys to offset? I made the following changes to
> the KIP:
>
> 2. With log compaction the broker needs to
>   a. read 1MB/s from the head of the log to update the in-memory state
>   b. read 1MB/s to update the map of keys to offsets
>   c. read 3MB/s (100MB from the already compacted log, 50MB from the
> new key-value records) from the older segments. The log will
> accumulate 50MB in 50 seconds worth of changes before compacting
> because the default configuration has a minimum clean ratio of 50%.
>
> The "100MB in the already compacted log" are these cleaned records.
> Let me know what you think and if I am missing something.
>
> > 11. The KIP mentions topic id. Currently there is no topic id. Does this
> > KIP depend on KIP-516?
> >
>
> For the purpose of measuring the impact, I was using the records
> proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on
> its design and implementation. I was just referencing that KIP in the
> motivation and analysis. The KIP only assumes the changes in KIP-595
> which has been approved but 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-07 Thread Jose Garcia Sancio
Thanks for your feedback Jun.

Here are my changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=21=20

My comments are below...

On Wed, Aug 5, 2020 at 1:59 PM Jun Rao  wrote:
>
> Hi, Jose,
>
> Thanks for the KIP. A few comments blow.
>
> 10. I agree with Jason that it's useful to document the motivation a bit
> clearer. Regarding semantic/performance, one benefit of snapshotting is
> that it allows changes to be encoded incrementally instead of using the
> full post image. For example, in KIP-631, each partition has multiple
> fields like assigned replicas, leader, epoch, isr, etc. If only isr is
> changed, the snapshotting approach allows the change to be represented with
> just the new value in isr. Compaction will require all existing fields to
> be included in order to represent just an isr change. This is just because
> we can customize the combining logic with snapshotting.
>

Yes. Right now the IsrChange record from KIP-631 has both the ISR and
the leader and epoch. I think we can split this record into two
records:
1. ISR change that includes the topic, partition, and isr.
2. Leader change that includes the topic, partition, leader and leader epoch.
I'll bring this up in the discussion thread for that KIP.

> As for the
> performance benefit, I guess in theory snapshotting allows the snapshot to
> be updated in-place incrementally without having to read the full state in
> the snapshot. BTW, during compaction, we only read the cleaned data once
> instead of 3 times.
>

Doesn't compaction need to read the clean records to compare if the
key is in the map of keys to offset? I made the following changes to
the KIP:

2. With log compaction the broker needs to
  a. read 1MB/s from the head of the log to update the in-memory state
  b. read 1MB/s to update the map of keys to offsets
  c. read 3MB/s (100MB from the already compacted log, 50MB from the
new key-value records) from the older segments. The log will
accumulate 50MB in 50 seconds worth of changes before compacting
because the default configuration has a minimum clean ratio of 50%.

The "100MB in the already compacted log" are these cleaned records.
Let me know what you think and if I am missing something.

> 11. The KIP mentions topic id. Currently there is no topic id. Does this
> KIP depend on KIP-516?
>

For the purpose of measuring the impact, I was using the records
proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on
its design and implementation. I was just referencing that KIP in the
motivation and analysis. The KIP only assumes the changes in KIP-595
which has been approved but it are not part of trunk yet.

In the overview section the KIP mentions: "This KIP assumes that
KIP-595 has been approved and implemented. "

> 12. Is there a need to keep more than 1 snapshot? It seems we always expose
> the latest snapshot to clients.
>

The KIP proposes keeping more than one snapshot to not invalidate any
pending/concurrent `FetchSnapshot` that are attempting to fetch a
snapshot that can be deleted. I'll remove this wording as the first
version of this implementation will probably won't have this feature
as it requires extra coordination. The implementation will still allow
for multiple snapshots because generating a snapshot is not atomic
with respect to increasing the LBO.


> 13. "During leader election, followers with incomplete or missing snapshot
> will send a vote request and response as if they had an empty log." Hmm, a
> follower may not have a snapshot created, but that doesn't imply its log is
> empty.
>

Yes. I fixed the "Validation of Snapshot and Log"  and that sentence.
I basically added an additional condition where a snapshot is not
required if the LBO is 0.

> 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do we
> compare an offset to a time?
>

Yeah. This may be hard to implement. I am trying to avoid invalidating
followers and observers by aggressively deleting an offset/record
which they are trying to fetch. It is possible that
`controller.snapshot.minimum.records` is good enough to throttle
increasing LBO.

> 15. "Followers and observers will increase their log begin offset to the
> value sent on the fetch response as long as the local state machine has
> generated a snapshot that includes to the log begin offset minus one." Does
> the observer store the log? I thought it only needed to maintain a
> snapshot. If so, the observer doesn't need to maintain LBO.
>

In KIP-595 observers are similar to voters/followers in that they
Fetch the log and the snapshot. Two of the distinctions are that they
don't participate in the leader election and they are not included
when computing the high-watermark. Regarding storing: it is possible
that observers never need to store the log since they don't vote or
become leaders. I think in the future we would like to implement
"KIP-642: Dynamic quorum reassignment" which would add the capability
to 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-05 Thread Jun Rao
Hi, Jose,

Thanks for the KIP. A few comments blow.

10. I agree with Jason that it's useful to document the motivation a bit
clearer. Regarding semantic/performance, one benefit of snapshotting is
that it allows changes to be encoded incrementally instead of using the
full post image. For example, in KIP-631, each partition has multiple
fields like assigned replicas, leader, epoch, isr, etc. If only isr is
changed, the snapshotting approach allows the change to be represented with
just the new value in isr. Compaction will require all existing fields to
be included in order to represent just an isr change. This is just because
we can customize the combining logic with snapshotting. As for the
performance benefit, I guess in theory snapshotting allows the snapshot to
be updated in-place incrementally without having to read the full state in
the snapshot. BTW, during compaction, we only read the cleaned data once
instead of 3 times.

11. The KIP mentions topic id. Currently there is no topic id. Does this
KIP depend on KIP-516?

12. Is there a need to keep more than 1 snapshot? It seems we always expose
the latest snapshot to clients.

13. "During leader election, followers with incomplete or missing snapshot
will send a vote request and response as if they had an empty log." Hmm, a
follower may not have a snapshot created, but that doesn't imply its log is
empty.

14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do we
compare an offset to a time?

15. "Followers and observers will increase their log begin offset to the
value sent on the fetch response as long as the local state machine has
generated a snapshot that includes to the log begin offset minus one." Does
the observer store the log? I thought it only needed to maintain a
snapshot. If so, the observer doesn't need to maintain LBO.

16. "There are two cases when the Kafka Controller needs to load the
snapshot: When it is booting. When the follower and observer replicas
finishes fetching a new snapshot from the leader." For faster failover, it
seems it's useful for a non-controller voter to maintain the in-memory
metadata state. In order to do that, it seems that every voter needs to
load the snapshot on booting?

17. "There is an invariant that every log must have at least one snapshot
where the offset of the snapshot is between LogBeginOffset - 1 and
High-Watermark. If this is not the case then the replica should assume that
the log is empty." Does that invariant hold when there is no snapshot
initially?

18. "The __cluster_metadata topic will have an cleanup.policy value of
snapshot" Is there a need to make this configurable if it's read-only?

19. OFFSET_OUT_OF_RANGE in FetchSnapshotResponse: It seems that
POSITION_OUT_OF_RANGE is more appropriate?

Thanks,

Jun

On Wed, Aug 5, 2020 at 12:13 PM Jose Garcia Sancio 
wrote:

> Once again, thanks for the feedback Jason,
>
> My changes to the KIP are here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=18=17
>
> And see my comments below...
>
> On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson  wrote:
> >
> > Hi Jose,
> >
> > Thanks for the proposal. I think there are three main motivations for
> > snapshotting over the existing compaction semantics.
> >
> > First we are arguing that compaction is a poor semantic fit for how we
> want
> > to model the metadata in the cluster. We are trying to view the changes
> in
> > the cluster as a stream of events, not necessarily as a stream of
> key/value
> > updates. The reason this is useful is that a single event may correspond
> to
> > a set of key/value updates. We don't need to delete each partition
> > individually for example if we are deleting the full topic. Outside of
> > deletion, however, the benefits of this approach are less obvious. I am
> > wondering if there are other cases where the event-based approach has
> some
> > benefit?
> >
>
> Yes. Another example of this is what KIP-631 calls FenceBroker. In the
> current implementation of the Kafka Controller and the implementation
> proposed in
> KIP-631, whenever a broker is fenced the controller removes the broker
> from the ISR and performs leader election if necessary. The impact of
> this operation on replication is documented in section "Amount of Data
> Replicated". I have also updated the KIP to reflect this.
>
> > The second motivation is from the perspective of consistency. Basically
> we
> > don't like the existing solution for the tombstone deletion problem,
> which
> > is just to add a delay before removal. The case we are concerned about
> > requires a replica to fetch up to a specific offset and then stall for a
> > time which is longer than the deletion retention timeout. If this
> happens,
> > then the replica might not see the tombstone, which would lead to an
> > inconsistent state. I think we are already talking about a rare case,
> but I
> > wonder if there are simple ways to tighten it further. For the sake of
> > 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-05 Thread Jose Garcia Sancio
Once again, thanks for the feedback Jason,

My changes to the KIP are here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=18=17

And see my comments below...

On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson  wrote:
>
> Hi Jose,
>
> Thanks for the proposal. I think there are three main motivations for
> snapshotting over the existing compaction semantics.
>
> First we are arguing that compaction is a poor semantic fit for how we want
> to model the metadata in the cluster. We are trying to view the changes in
> the cluster as a stream of events, not necessarily as a stream of key/value
> updates. The reason this is useful is that a single event may correspond to
> a set of key/value updates. We don't need to delete each partition
> individually for example if we are deleting the full topic. Outside of
> deletion, however, the benefits of this approach are less obvious. I am
> wondering if there are other cases where the event-based approach has some
> benefit?
>

Yes. Another example of this is what KIP-631 calls FenceBroker. In the
current implementation of the Kafka Controller and the implementation
proposed in
KIP-631, whenever a broker is fenced the controller removes the broker
from the ISR and performs leader election if necessary. The impact of
this operation on replication is documented in section "Amount of Data
Replicated". I have also updated the KIP to reflect this.

> The second motivation is from the perspective of consistency. Basically we
> don't like the existing solution for the tombstone deletion problem, which
> is just to add a delay before removal. The case we are concerned about
> requires a replica to fetch up to a specific offset and then stall for a
> time which is longer than the deletion retention timeout. If this happens,
> then the replica might not see the tombstone, which would lead to an
> inconsistent state. I think we are already talking about a rare case, but I
> wonder if there are simple ways to tighten it further. For the sake of
> argument, what if we had the replica start over from the beginning whenever
> there is a replication delay which is longer than tombstone retention time?
> Just want to be sure we're not missing any simple/pragmatic solutions
> here...
>

We explore the changes needed to log compaction and the fetch protocol
such that it results in a consistent replicated log in the rejected
sections. I changed the KIP to also mention it in the motivation
section by adding a section called "Consistent Log and Tombstones"

> Finally, I think we are arguing that compaction gives a poor performance
> tradeoff when the state is already in memory. It requires us to read and
> replay all of the changes even though we already know the end result. One
> way to think about it is that compaction works O(the rate of changes) while
> snapshotting is O(the size of data). Contrarily, the nice thing about
> compaction is that it works irrespective of the size of the data, which
> makes it a better fit for user partitions. I feel like this might be an
> argument we can make empirically or at least with back-of-the-napkin
> calculations. If we assume a fixed size of data and a certain rate of
> change, then what are the respective costs of snapshotting vs compaction? I
> think compaction fares worse as the rate of change increases. In the case
> of __consumer_offsets, which sometimes has to support a very high rate of
> offset commits, I think snapshotting would be a great tradeoff to reduce
> load time on coordinator failover. The rate of change for metadata on the
> other hand might not be as high, though it can be very bursty.
>

This is a very good observation. If you assume that the number of keys
doesn't change but that we have frequent updates to its values then I
think that after log compaction the size of the compacted section of
the log is O(size of the data) + O(size of the tombstones). And as you
point out the size of the snapshot is also O(size of the data). I
think this is a reasonable assumption for topics like
__cluster_metadata and __consumer_offsets.

The difference is the number of reads required. With in-memory
snapshot we only need to read the log once. With log compaction we
need to read the log 3 times: 1. to update the in-memory state, 2.
generate the map of key to offset and 3. compact the log using the map
of keys to offset. I have updated the KIP and go into a lot more
details in section "Loading State and Frequency of Compaction

-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-04 Thread Jose Garcia Sancio
Thanks for your feedback Jason. I'll have a more detailed reply and
update to the KIP by EOD today.

On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson  wrote:
>
> Hi Jose,
>
> Thanks for the proposal. I think there are three main motivations for
> snapshotting over the existing compaction semantics.
>
> First we are arguing that compaction is a poor semantic fit for how we want
> to model the metadata in the cluster. We are trying to view the changes in
> the cluster as a stream of events, not necessarily as a stream of key/value
> updates. The reason this is useful is that a single event may correspond to
> a set of key/value updates. We don't need to delete each partition
> individually for example if we are deleting the full topic. Outside of
> deletion, however, the benefits of this approach are less obvious. I am
> wondering if there are other cases where the event-based approach has some
> benefit?
>
> The second motivation is from the perspective of consistency. Basically we
> don't like the existing solution for the tombstone deletion problem, which
> is just to add a delay before removal. The case we are concerned about
> requires a replica to fetch up to a specific offset and then stall for a
> time which is longer than the deletion retention timeout. If this happens,
> then the replica might not see the tombstone, which would lead to an
> inconsistent state. I think we are already talking about a rare case, but I
> wonder if there are simple ways to tighten it further. For the sake of
> argument, what if we had the replica start over from the beginning whenever
> there is a replication delay which is longer than tombstone retention time?
> Just want to be sure we're not missing any simple/pragmatic solutions
> here...
>
> Finally, I think we are arguing that compaction gives a poor performance
> tradeoff when the state is already in memory. It requires us to read and
> replay all of the changes even though we already know the end result. One
> way to think about it is that compaction works O(the rate of changes) while
> snapshotting is O(the size of data). Contrarily, the nice thing about
> compaction is that it works irrespective of the size of the data, which
> makes it a better fit for user partitions. I feel like this might be an
> argument we can make empirically or at least with back-of-the-napkin
> calculations. If we assume a fixed size of data and a certain rate of
> change, then what are the respective costs of snapshotting vs compaction? I
> think compaction fares worse as the rate of change increases. In the case
> of __consumer_offsets, which sometimes has to support a very high rate of
> offset commits, I think snapshotting would be a great tradeoff to reduce
> load time on coordinator failover. The rate of change for metadata on the
> other hand might not be as high, though it can be very bursty.
>
> Thanks,
> Jason
>
>
> On Wed, Jul 29, 2020 at 2:03 PM Jose Garcia Sancio 
> wrote:
>
> > Thanks Ron for the additional comments and suggestions.
> >
> > Here are the changes to the KIP:
> >
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15
> >
> > On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino  wrote:
> > >
> > > Thanks, Jose.  It's looking good.  Here is one minor correction:
> > >
> > > <<< If the Kafka topic partition leader receives a fetch request with an
> > > offset and epoch greater than or equal to the LBO (x + 1, a)
> > > >>> If the Kafka topic partition leader receives a fetch request with an
> > > offset and epoch greater than or equal to the LBO (x + 1, b)
> > >
> >
> > Done.
> >
> > > Here is one more question.  Is there an ability to evolve the snapshot
> > > format over time, and if so, how is that managed for upgrades? It would
> > be
> > > both Controllers and Brokers that would depend on the format, correct?
> > > Those could be the same thing if the controller was running inside the
> > > broker JVM, but that is an option rather than a requirement, I think.
> > > Might the Controller upgrade have to be coordinated with the broker
> > upgrade
> > > in the separate-JVM case?  Perhaps a section discussing this would be
> > > appropriate?
> > >
> >
> > The content set though the FetchSnapshot RPC is expected to be
> > compatible with future changes. In KIP-631 the Kafka Controller is
> > going to use the existing Kafka Message and versioning scheme.
> > Specifically see section "Record Format Versions". I added some
> > wording around this.
> >
> > Thanks!
> > -Jose
> >



-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-03 Thread Jason Gustafson
Hi Jose,

Thanks for the proposal. I think there are three main motivations for
snapshotting over the existing compaction semantics.

First we are arguing that compaction is a poor semantic fit for how we want
to model the metadata in the cluster. We are trying to view the changes in
the cluster as a stream of events, not necessarily as a stream of key/value
updates. The reason this is useful is that a single event may correspond to
a set of key/value updates. We don't need to delete each partition
individually for example if we are deleting the full topic. Outside of
deletion, however, the benefits of this approach are less obvious. I am
wondering if there are other cases where the event-based approach has some
benefit?

The second motivation is from the perspective of consistency. Basically we
don't like the existing solution for the tombstone deletion problem, which
is just to add a delay before removal. The case we are concerned about
requires a replica to fetch up to a specific offset and then stall for a
time which is longer than the deletion retention timeout. If this happens,
then the replica might not see the tombstone, which would lead to an
inconsistent state. I think we are already talking about a rare case, but I
wonder if there are simple ways to tighten it further. For the sake of
argument, what if we had the replica start over from the beginning whenever
there is a replication delay which is longer than tombstone retention time?
Just want to be sure we're not missing any simple/pragmatic solutions
here...

Finally, I think we are arguing that compaction gives a poor performance
tradeoff when the state is already in memory. It requires us to read and
replay all of the changes even though we already know the end result. One
way to think about it is that compaction works O(the rate of changes) while
snapshotting is O(the size of data). Contrarily, the nice thing about
compaction is that it works irrespective of the size of the data, which
makes it a better fit for user partitions. I feel like this might be an
argument we can make empirically or at least with back-of-the-napkin
calculations. If we assume a fixed size of data and a certain rate of
change, then what are the respective costs of snapshotting vs compaction? I
think compaction fares worse as the rate of change increases. In the case
of __consumer_offsets, which sometimes has to support a very high rate of
offset commits, I think snapshotting would be a great tradeoff to reduce
load time on coordinator failover. The rate of change for metadata on the
other hand might not be as high, though it can be very bursty.

Thanks,
Jason


On Wed, Jul 29, 2020 at 2:03 PM Jose Garcia Sancio 
wrote:

> Thanks Ron for the additional comments and suggestions.
>
> Here are the changes to the KIP:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15
>
> On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino  wrote:
> >
> > Thanks, Jose.  It's looking good.  Here is one minor correction:
> >
> > <<< If the Kafka topic partition leader receives a fetch request with an
> > offset and epoch greater than or equal to the LBO (x + 1, a)
> > >>> If the Kafka topic partition leader receives a fetch request with an
> > offset and epoch greater than or equal to the LBO (x + 1, b)
> >
>
> Done.
>
> > Here is one more question.  Is there an ability to evolve the snapshot
> > format over time, and if so, how is that managed for upgrades? It would
> be
> > both Controllers and Brokers that would depend on the format, correct?
> > Those could be the same thing if the controller was running inside the
> > broker JVM, but that is an option rather than a requirement, I think.
> > Might the Controller upgrade have to be coordinated with the broker
> upgrade
> > in the separate-JVM case?  Perhaps a section discussing this would be
> > appropriate?
> >
>
> The content set though the FetchSnapshot RPC is expected to be
> compatible with future changes. In KIP-631 the Kafka Controller is
> going to use the existing Kafka Message and versioning scheme.
> Specifically see section "Record Format Versions". I added some
> wording around this.
>
> Thanks!
> -Jose
>


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-29 Thread Jose Garcia Sancio
Thanks Ron for the additional comments and suggestions.

Here are the changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15

On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino  wrote:
>
> Thanks, Jose.  It's looking good.  Here is one minor correction:
>
> <<< If the Kafka topic partition leader receives a fetch request with an
> offset and epoch greater than or equal to the LBO (x + 1, a)
> >>> If the Kafka topic partition leader receives a fetch request with an
> offset and epoch greater than or equal to the LBO (x + 1, b)
>

Done.

> Here is one more question.  Is there an ability to evolve the snapshot
> format over time, and if so, how is that managed for upgrades? It would be
> both Controllers and Brokers that would depend on the format, correct?
> Those could be the same thing if the controller was running inside the
> broker JVM, but that is an option rather than a requirement, I think.
> Might the Controller upgrade have to be coordinated with the broker upgrade
> in the separate-JVM case?  Perhaps a section discussing this would be
> appropriate?
>

The content set though the FetchSnapshot RPC is expected to be
compatible with future changes. In KIP-631 the Kafka Controller is
going to use the existing Kafka Message and versioning scheme.
Specifically see section "Record Format Versions". I added some
wording around this.

Thanks!
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-29 Thread Ron Dagostino
Thanks, Jose.  It's looking good.  Here is one minor correction:

<<< If the Kafka topic partition leader receives a fetch request with an
offset and epoch greater than or equal to the LBO (x + 1, a)
>>> If the Kafka topic partition leader receives a fetch request with an
offset and epoch greater than or equal to the LBO (x + 1, b)

Here is one more question.  Is there an ability to evolve the snapshot
format over time, and if so, how is that managed for upgrades? It would be
both Controllers and Brokers that would depend on the format, correct?
Those could be the same thing if the controller was running inside the
broker JVM, but that is an option rather than a requirement, I think.
Might the Controller upgrade have to be coordinated with the broker upgrade
in the separate-JVM case?  Perhaps a section discussing this would be
appropriate?

Ron


On Tue, Jul 28, 2020 at 11:14 PM Jose Garcia Sancio 
wrote:

> Thanks Ron. Your comments and suggestions were helpful. You can see my
> changes to the KIP here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=15=14
>
> My comments are below...
>
> On Mon, Jul 27, 2020 at 11:29 AM Ron Dagostino  wrote:
> >
> > Hi Jose.  Thanks for the KIP.  Here are some questions and some nit
> corrections.
> >
> > <<< In KIP-500 the Kafka Controller, which is the quorum leader from
> > KIP-595, will materialize the entries in the metadata log into memory.
> > Technically I think the quorum leader is referred to as the Active
> > Controller in KIP-500.  Maybe replace "Kafka Controller" with "Active
> > Controller"?  I think the term "Kafka Controller" is fine as used
> > throughout the rest of the KIP to refer to the entire thing, but when
> > referring specifically to the leader I think "Active Controller" is
> > the term that is defined in KIP-500.
> >
>
> Made those changes.
>
> >
> > <<< Each broker in KIP-500, which will be a replica of the metadata
> > log, will materialize the entries in the log into a metadata cache
> > This wording confused me because I assumed that "replica" was a formal
> > term and only (non-Active) Controllers are formally "replicas" of the
> > metadata log -- Kafka brokers would be clients that read the log and
> > then use the data for their own purpose as opposed to formally being
> > replicas with this understanding of the term "replica".  Is that
> > correct, and if so, maybe replace "replica" with "client"?
> >
>
> In KIP-595 we have two types of replicas: voters and observers. Voter
> replicas are Kafka Controllers and one one of them will become the
> Active controller. Observer replicas fetch from the log and attempt to
> keep up with the LEO of the Active Controller. I think you can
> consider all of them as "client" of the replicated log.
>
> >
> > <<< The type of in-memory state machines what we plan to implement
> > >>> The type of in-memory state machines that we plan to implement
> > nit
> >
>
> Done.
>
> >
> > <<< doesn't map very well to an key and offset based clean up policy.
> > >>> doesn't map very well to a key and offset based clean up policy.
> > nit
> >
>
> Done.
>
> >
> > <<< When starting a broker either because it is a new broker, a broker
> > was upgraded or a failed broker is restarting. Loading the state
> > represented by the __cluster_metadata topic partition is required
> > before the broker is available
> > >>> When starting a broker either because it is a new broker or it is
> restarting, loading the state represented by the __cluster_metadata topic
> partition is required before the broker is available.
> > Reword for simplicity and clarity?
> >
>
> Done.
>
> >
> > <<< With snapshot based of the in-memory state Kafka can be much more
> aggressive
> > >>> By taking and transmitting a snapshot of the in-memory state as
> described below Kafka can be much more aggressive
> > Tough to refer to the concept of snapshot here without having
> > described what it is, so refer to "as described below" to help orient
> > the reader?
> >
>
> Made some changes to these sentences. I agree that fully understanding
> parts of the motivated section requires reading the rest of the
> document. I wanted to make sure that we had this in the motivation
> section.
>
> >
> > <<< In the future this mechanism will also be useful for
> > high-throughput topic partitions like the Group Coordinator and
> > Transaction Coordinator.
> > >>> In the future this mechanism may also be useful for high-throughput
> topic partitions like the Group Coordinator and Transaction Coordinator.
> > Tough to say "will" when that is an assumption that would depend on a
> KIP?
> >
>
> Yeah. Changed it.
>
> >
> > << > __cluster_metadata topic partition then the ... Kafka Controller will
> > need to replicate 3.81 MB to each broker in the cluster (10) or 38.14
> > MB.
> > It might be good to append a sentence that explicitly states how much
> > data is replicated for the delta/event -- right now it is implied to
> > be 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-28 Thread Jose Garcia Sancio
Thanks Ron. Your comments and suggestions were helpful. You can see my
changes to the KIP here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=15=14

My comments are below...

On Mon, Jul 27, 2020 at 11:29 AM Ron Dagostino  wrote:
>
> Hi Jose.  Thanks for the KIP.  Here are some questions and some nit 
> corrections.
>
> <<< In KIP-500 the Kafka Controller, which is the quorum leader from
> KIP-595, will materialize the entries in the metadata log into memory.
> Technically I think the quorum leader is referred to as the Active
> Controller in KIP-500.  Maybe replace "Kafka Controller" with "Active
> Controller"?  I think the term "Kafka Controller" is fine as used
> throughout the rest of the KIP to refer to the entire thing, but when
> referring specifically to the leader I think "Active Controller" is
> the term that is defined in KIP-500.
>

Made those changes.

>
> <<< Each broker in KIP-500, which will be a replica of the metadata
> log, will materialize the entries in the log into a metadata cache
> This wording confused me because I assumed that "replica" was a formal
> term and only (non-Active) Controllers are formally "replicas" of the
> metadata log -- Kafka brokers would be clients that read the log and
> then use the data for their own purpose as opposed to formally being
> replicas with this understanding of the term "replica".  Is that
> correct, and if so, maybe replace "replica" with "client"?
>

In KIP-595 we have two types of replicas: voters and observers. Voter
replicas are Kafka Controllers and one one of them will become the
Active controller. Observer replicas fetch from the log and attempt to
keep up with the LEO of the Active Controller. I think you can
consider all of them as "client" of the replicated log.

>
> <<< The type of in-memory state machines what we plan to implement
> >>> The type of in-memory state machines that we plan to implement
> nit
>

Done.

>
> <<< doesn't map very well to an key and offset based clean up policy.
> >>> doesn't map very well to a key and offset based clean up policy.
> nit
>

Done.

>
> <<< When starting a broker either because it is a new broker, a broker
> was upgraded or a failed broker is restarting. Loading the state
> represented by the __cluster_metadata topic partition is required
> before the broker is available
> >>> When starting a broker either because it is a new broker or it is 
> >>> restarting, loading the state represented by the __cluster_metadata topic 
> >>> partition is required before the broker is available.
> Reword for simplicity and clarity?
>

Done.

>
> <<< With snapshot based of the in-memory state Kafka can be much more 
> aggressive
> >>> By taking and transmitting a snapshot of the in-memory state as described 
> >>> below Kafka can be much more aggressive
> Tough to refer to the concept of snapshot here without having
> described what it is, so refer to "as described below" to help orient
> the reader?
>

Made some changes to these sentences. I agree that fully understanding
parts of the motivated section requires reading the rest of the
document. I wanted to make sure that we had this in the motivation
section.

>
> <<< In the future this mechanism will also be useful for
> high-throughput topic partitions like the Group Coordinator and
> Transaction Coordinator.
> >>> In the future this mechanism may also be useful for high-throughput topic 
> >>> partitions like the Group Coordinator and Transaction Coordinator.
> Tough to say "will" when that is an assumption that would depend on a KIP?
>

Yeah. Changed it.

>
> << __cluster_metadata topic partition then the ... Kafka Controller will
> need to replicate 3.81 MB to each broker in the cluster (10) or 38.14
> MB.
> It might be good to append a sentence that explicitly states how much
> data is replicated for the delta/event -- right now it is implied to
> be very small, but that's kind of like leaving the punch line to a
> joke implied :-)
>

Thanks. I updated the example and added numbers for the events/deltas case.


>
> <<< Follower and observer replicas fetch the snapshots from the leader
> they attempt to fetch an offset from the leader and the leader doesn’t
> have that offset in the log
> >>> Follower and observer replicas fetch a snapshot from the leader when they 
> >>> attempt to fetch an offset from the leader and the leader doesn’t have 
> >>> that offset in the log
> nit
>

Done.

>
> >>> Generating and loading the snapshot will be delegated to the Kafka 
> >>> Controller.
> >>> The Kafka Controller will notify the Kafka Raft client when it has 
> >>> generated a snapshot and up to which offset is included in the snapshot.
> >>> The Kafka Raft client will notify the Kafka Controller when a new 
> >>> snapshot has been fetched from the leader.
> This paragraph confuses me.  What is the "Kafka Raft client" -- is
> this the broker? Or is it some other subsystem (or all other
> subsystems aside from log 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-27 Thread Ron Dagostino
Hi Jose.  Thanks for the KIP.  Here are some questions and some nit corrections.

<<< In KIP-500 the Kafka Controller, which is the quorum leader from
KIP-595, will materialize the entries in the metadata log into memory.
Technically I think the quorum leader is referred to as the Active
Controller in KIP-500.  Maybe replace "Kafka Controller" with "Active
Controller"?  I think the term "Kafka Controller" is fine as used
throughout the rest of the KIP to refer to the entire thing, but when
referring specifically to the leader I think "Active Controller" is
the term that is defined in KIP-500.


<<< Each broker in KIP-500, which will be a replica of the metadata
log, will materialize the entries in the log into a metadata cache
This wording confused me because I assumed that "replica" was a formal
term and only (non-Active) Controllers are formally "replicas" of the
metadata log -- Kafka brokers would be clients that read the log and
then use the data for their own purpose as opposed to formally being
replicas with this understanding of the term "replica".  Is that
correct, and if so, maybe replace "replica" with "client"?


<<< The type of in-memory state machines what we plan to implement
>>> The type of in-memory state machines that we plan to implement
nit


<<< doesn't map very well to an key and offset based clean up policy.
>>> doesn't map very well to a key and offset based clean up policy.
nit


<<< When starting a broker either because it is a new broker, a broker
was upgraded or a failed broker is restarting. Loading the state
represented by the __cluster_metadata topic partition is required
before the broker is available
>>> When starting a broker either because it is a new broker or it is 
>>> restarting, loading the state represented by the __cluster_metadata topic 
>>> partition is required before the broker is available.
Reword for simplicity and clarity?


<<< With snapshot based of the in-memory state Kafka can be much more aggressive
>>> By taking and transmitting a snapshot of the in-memory state as described 
>>> below Kafka can be much more aggressive
Tough to refer to the concept of snapshot here without having
described what it is, so refer to "as described below" to help orient
the reader?


<<< In the future this mechanism will also be useful for
high-throughput topic partitions like the Group Coordinator and
Transaction Coordinator.
>>> In the future this mechanism may also be useful for high-throughput topic 
>>> partitions like the Group Coordinator and Transaction Coordinator.
Tough to say "will" when that is an assumption that would depend on a KIP?


<<>> Follower and observer replicas fetch a snapshot from the leader when they 
>>> attempt to fetch an offset from the leader and the leader doesn’t have that 
>>> offset in the log
nit


>>> Generating and loading the snapshot will be delegated to the Kafka 
>>> Controller.
>>> The Kafka Controller will notify the Kafka Raft client when it has 
>>> generated a snapshot and up to which offset is included in the snapshot.
>>> The Kafka Raft client will notify the Kafka Controller when a new snapshot 
>>> has been fetched from the leader.
This paragraph confuses me.  What is the "Kafka Raft client" -- is
this the broker? Or is it some other subsystem (or all other
subsystems aside from log replication) within the Controller?  Has
this been defined somewhere?  If so it would be good to refer to that
definition.  (Actually, now that I've read further down, I think you
refer to this as "Kafka Raft" later in the KIP; a reference to these
later sections or naming it Kafka Raft Client later on would have
helped me avoid confusion -- I searched the doc for raft client rather
than kafka raft, so I missed this when I searched.)


<<< The Kafka Controller will notify the Kafka Raft client when it has
finished generating a new snapshots.
Same comment about "Kafka Raft client".


<<< It is safe for the log to truncate a prefix of the log up to the
latest snapshot.
"log to truncate a prefix of the log" -- That first mention of "log"
needs to be something else I assume -- LogManager maybe?


<<< In the example above, if the Kafka topic partition leader receives
a fetch request with an offset and epoch greater than or equal to the
log begin offset (x, a)
<<< In the example above, offset=x, epoch=a does not appear in the
diagram because it is before the log begin offset (x+1, b)   If the
Kafka topic partition leader receives a fetch request with an offset
and epoch greater than or equal to the LBO
Maybe add an explicit comment that offset=x, epoch=a does not appear
in the diagram because it is before the LBO of (x+1, b)?  Also need to
fix LBO reference (currently incorrectly stated as (x, a).


<<< LBO can be increase/set to an offset X if the following is true:
<<< 2. One of the following is true:
Do the pair of conditions in (2) only apply to the leader/Active Controller?


<<< The broker will delete any snapshot with a latest offset and epoch