Re: [VOTE] KIP-630: Kafka Raft Snapshot

2020-10-12 Thread Jose Garcia Sancio
Thanks everyone for the votes. KIP-630 has been accepted.

Binding: Guozhang, Jason and Jun
Non-binding: Ron and Lucas

On Fri, Oct 9, 2020 at 4:33 PM Jose Garcia Sancio  wrote:
>
> Thanks for the votes Jun, Jason, Ron, Lucas and Guozhang.
>
> Thanks for the feedback Ron and Jun.
>
> Agree with your comments Ron. I have updated those configurations to
> metadata.snapshot.min.changed_records.ratio and
> metadata.snapshot.min.new_records.size. I thought of using "clenable"
> to keep it consistent with the configuration of compaction policy.
> Snapshots are different enough that that consistency is not needed.
>
> Jun, I missed the incorrect mention of OFFSET_OUT_OF_RANGE. I have
> replaced it with POSITION_OUT_OF_RANGE.
>
> Changes to the KIP are here:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=39=37
>
> I believe that we have enough votes to accept this KIP. I'll close the
> voting on Monday.
>
> On Mon, Oct 5, 2020 at 2:04 PM Jun Rao  wrote:
> >
> > Hi, Jose,
> >
> > Thanks for the KIP. +1. A couple of minor comments below.
> >
> > 1. The new configuration names suggested by Ron sound reasonable.
> > 2. It seems that OFFSET_OUT_OF_RANGE in the wiki needs to be changed to
> > POSITION_OUT_OF_RANGE.
> >
> > Jun
> >
> > On Mon, Oct 5, 2020 at 9:46 AM Jason Gustafson  wrote:
> >
> > > +1 Thanks for the KIP!
> > >
> > > -Jason
> > >
> > > On Mon, Oct 5, 2020 at 9:03 AM Ron Dagostino  wrote:
> > >
> > > > Thanks for the KIP, Jose.  +1 (non-binding) from me.
> > > >
> > > > I do have one comment/confusion.
> > > >
> > > > Upon re-reading the latest version, I am confused about the name of
> > > > the proposed "metadata.snapshot.min.records" config.  Is this a size,
> > > > or is it a count?  I think it is about a size but want to be sure.  I
> > > > also wonder if it is about changes (updates/deletes) rather than just
> > > > additions/accretions, or is it independent of that?
> > > >
> > > > I'm also unclear about the definition of the
> > > > "metadata.snapshot.min.cleanable.ratio" config -- is that a ratio of a
> > > > *number* of new records to the number of snapshot records?  Or is it a
> > > > *size* ratio?  I think it is a ratio of numbers of records rather than
> > > > a ratio of sizes.  I think this one is also about changes
> > > > (updates/deletes) rather than just additions/accretions.
> > > >
> > > > I'm wondering if we can be clearer with the names of these two configs
> > > > to make their definitions more apparent.  For example, assuming
> > > > certain definitions as mentioned above:
> > > >
> > > > metadata.snapshot.min.new_records.size -- the minimum size of new
> > > > records required before a snapshot can occur
> > > > metadata.snapshot.min.change_records.ratio -- the minimum ratio of the
> > > > number of change (i.e. not simply accretion) records to the number of
> > > > records in the last snapshot (if any) that must be achieved before a
> > > > snapshot can occur.
> > > >
> > > > For example, if there is no snapshot yet, then ".new_records.size"
> > > > must be written before a snapshot is allowed.  If there is a snapshot
> > > > with N records, then before a snapshot is allowed both
> > > > ".new_records.size" must be written and ".change_records.ratio" must
> > > > be satisfied such that the number of changes (not accretions) divided
> > > > by N meets the ratio.
> > > >
> > > > Ron
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Oct 2, 2020 at 8:14 PM Lucas Bradstreet 
> > > > wrote:
> > > > >
> > > > > Thanks for the KIP! Non-binding +1
> > > > >
> > > > > On Fri, Oct 2, 2020 at 3:30 PM Guozhang Wang 
> > > wrote:
> > > > >
> > > > > > Thanks Jose! +1 from me.
> > > > > >
> > > > > > On Fri, Oct 2, 2020 at 3:18 PM Jose Garcia Sancio <
> > > > jsan...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to start a vote on KIP-630.
> > > > > > >
> > > > > > > KIP: https://cwiki.apache.org/confluence/x/exV4CQ
> > > > > > > Discussion Thread:
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > https://lists.apache.org/thread.html/r9468d1f276385695a2d6d48f6dfbdc504c445fc5745aaa606d138fed%40%3Cdev.kafka.apache.org%3E
> > > > > > >
> > > > > > > Thank you
> > > > > > > --
> > > > > > > -Jose
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > >
> > >
>
>
>
> --
> -Jose



-- 
-Jose


Re: [VOTE] KIP-630: Kafka Raft Snapshot

2020-10-09 Thread Jose Garcia Sancio
Thanks for the votes Jun, Jason, Ron, Lucas and Guozhang.

Thanks for the feedback Ron and Jun.

Agree with your comments Ron. I have updated those configurations to
metadata.snapshot.min.changed_records.ratio and
metadata.snapshot.min.new_records.size. I thought of using "clenable"
to keep it consistent with the configuration of compaction policy.
Snapshots are different enough that that consistency is not needed.

Jun, I missed the incorrect mention of OFFSET_OUT_OF_RANGE. I have
replaced it with POSITION_OUT_OF_RANGE.

Changes to the KIP are here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=39=37

I believe that we have enough votes to accept this KIP. I'll close the
voting on Monday.

On Mon, Oct 5, 2020 at 2:04 PM Jun Rao  wrote:
>
> Hi, Jose,
>
> Thanks for the KIP. +1. A couple of minor comments below.
>
> 1. The new configuration names suggested by Ron sound reasonable.
> 2. It seems that OFFSET_OUT_OF_RANGE in the wiki needs to be changed to
> POSITION_OUT_OF_RANGE.
>
> Jun
>
> On Mon, Oct 5, 2020 at 9:46 AM Jason Gustafson  wrote:
>
> > +1 Thanks for the KIP!
> >
> > -Jason
> >
> > On Mon, Oct 5, 2020 at 9:03 AM Ron Dagostino  wrote:
> >
> > > Thanks for the KIP, Jose.  +1 (non-binding) from me.
> > >
> > > I do have one comment/confusion.
> > >
> > > Upon re-reading the latest version, I am confused about the name of
> > > the proposed "metadata.snapshot.min.records" config.  Is this a size,
> > > or is it a count?  I think it is about a size but want to be sure.  I
> > > also wonder if it is about changes (updates/deletes) rather than just
> > > additions/accretions, or is it independent of that?
> > >
> > > I'm also unclear about the definition of the
> > > "metadata.snapshot.min.cleanable.ratio" config -- is that a ratio of a
> > > *number* of new records to the number of snapshot records?  Or is it a
> > > *size* ratio?  I think it is a ratio of numbers of records rather than
> > > a ratio of sizes.  I think this one is also about changes
> > > (updates/deletes) rather than just additions/accretions.
> > >
> > > I'm wondering if we can be clearer with the names of these two configs
> > > to make their definitions more apparent.  For example, assuming
> > > certain definitions as mentioned above:
> > >
> > > metadata.snapshot.min.new_records.size -- the minimum size of new
> > > records required before a snapshot can occur
> > > metadata.snapshot.min.change_records.ratio -- the minimum ratio of the
> > > number of change (i.e. not simply accretion) records to the number of
> > > records in the last snapshot (if any) that must be achieved before a
> > > snapshot can occur.
> > >
> > > For example, if there is no snapshot yet, then ".new_records.size"
> > > must be written before a snapshot is allowed.  If there is a snapshot
> > > with N records, then before a snapshot is allowed both
> > > ".new_records.size" must be written and ".change_records.ratio" must
> > > be satisfied such that the number of changes (not accretions) divided
> > > by N meets the ratio.
> > >
> > > Ron
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Oct 2, 2020 at 8:14 PM Lucas Bradstreet 
> > > wrote:
> > > >
> > > > Thanks for the KIP! Non-binding +1
> > > >
> > > > On Fri, Oct 2, 2020 at 3:30 PM Guozhang Wang 
> > wrote:
> > > >
> > > > > Thanks Jose! +1 from me.
> > > > >
> > > > > On Fri, Oct 2, 2020 at 3:18 PM Jose Garcia Sancio <
> > > jsan...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to start a vote on KIP-630.
> > > > > >
> > > > > > KIP: https://cwiki.apache.org/confluence/x/exV4CQ
> > > > > > Discussion Thread:
> > > > > >
> > > > > >
> > > > >
> > >
> > https://lists.apache.org/thread.html/r9468d1f276385695a2d6d48f6dfbdc504c445fc5745aaa606d138fed%40%3Cdev.kafka.apache.org%3E
> > > > > >
> > > > > > Thank you
> > > > > > --
> > > > > > -Jose
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > >
> >



-- 
-Jose


[VOTE] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
Hi all,

I would like to start a vote on KIP-630.

KIP: https://cwiki.apache.org/confluence/x/exV4CQ
Discussion Thread:
https://lists.apache.org/thread.html/r9468d1f276385695a2d6d48f6dfbdc504c445fc5745aaa606d138fed%40%3Cdev.kafka.apache.org%3E

Thank you
-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
I am going to propose that we take a vote on this KIP.

Thank you Jason, Ron, Jun and Guozhang for the feedback and discussion.

On Fri, Oct 2, 2020 at 3:11 PM Jose Garcia Sancio  wrote:
>
> Thank you Jun!
>
> Changes:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=37=36
>
> > 41. Perhaps metadata.snapshot.min.records.size can just
> > be metadata.snapshot.min.records?
>
> Sounds good to me. Done.
>
> > 42. It's probably fine to change maxBytes for Fetch in a separate PR. I
> > brought this up since this KIP is changing the Fetch request.
>
> Okay. Minor nit. KIP-630 changed the Fetch response. We will bump the
> Fetch request version because it is required but the KIP doesn't make
> any changes to the fields in the Fetch request.
>
> -Jose



-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
Thank you Jun!

Changes:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=37=36

> 41. Perhaps metadata.snapshot.min.records.size can just
> be metadata.snapshot.min.records?

Sounds good to me. Done.

> 42. It's probably fine to change maxBytes for Fetch in a separate PR. I
> brought this up since this KIP is changing the Fetch request.

Okay. Minor nit. KIP-630 changed the Fetch response. We will bump the
Fetch request version because it is required but the KIP doesn't make
any changes to the fields in the Fetch request.

-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Jose Garcia Sancio
I read through KIP-630 and made the following minor changes.

https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=36=35

-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jose Garcia Sancio
Comments below.

Here are the change to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=34=33

> 41. That's a good point. With compacted topic, the cleaning won't be done
> until the active segment rolls. With snapshots, I guess we don't have this
> restriction? So, it would be useful to guard against too frequent
> snapshotting. Does the new proposal address this completely? If the
> snapshot has only 1 record, and the new record keeps updating the same key,
> does that still cause the snapshot to be generated frequently?

That is true. In addition to metadata.snapshot.min.cleanable.ratio we
can add the following configuration:

metadata.snapshot.min.records.size - This is the minimum number of
bytes in the replicated log between the latest snapshot and the
high-watermark needed before generating a new snapshot. The default is
20MB.

Both configurations need to be satisfied before generating a new
snapshot. I have updated the KIP.

> 42. One of the reasons that we added the per partition limit is to allow
> each partition to make relatively even progress during the catchup phase.
> This helps kstreams by potentially reducing the gap between stream time
> from different partitions. If we can achieve the same thing without the
> partition limit, it will be fine too. "3. For the remaining partitions send
> at most the average max bytes plus the average remaining bytes." Do we
> guarantee that request level max_bytes can be filled up when it can? Could
> we document how we distribute the request level max_bytes to partitions in
> the KIP?

I want to allow some flexibility in the implementation. How about the
following update to the FetchSnapshot Request Handling section:

3. Send the bytes in the snapshot from Position. If there are multiple
partitions in the FetchSnapshot request, then the leader will evenly
distribute the number of bytes sent across all of the partitions. The
leader will not send more bytes in the response than ResponseMaxBytes,
the minimum of MaxBytes in the request and the value configured in
replica.fetch.response.max.bytes.
  a. Each topic partition is guaranteed to receive at least the
average of ResponseMaxBytes if that snapshot has enough bytes
remaining.
  b. If there are topic partitions with snapshots that have remaining
bytes less than the average ResponseMaxBytes, then those bytes may be
used to send snapshot bytes for other topic partitions.

I should also point out that in practice for KIP-630 this request will
only have one topic partition (__cluster_metadata-0).

I should also point out that FetchSnapshot is sending bytes not
records so there is no requirement that the response must contain at
least one record like Fetch.

>Also, should we change Fetch accordingly?

If we want to make this change I think we should do this in another
KIP. What do you think?

> 46. If we don't have IBP, how do we make sure that a broker doesn't
> issue FetchSnapshotRequest when the receiving broker hasn't been upgraded
> yet?

For a broker to send a FetchSnapshotRequest it means that it received
a FetchResponse that contained a SnapshotId field. For the leader to
send a SnapshotId in the FetchResponse it means that the leader is
executing code that knows how to handle FetchSnapshotRequests.

The inverse is also true. For the follower to receive a SnapshotId for
the FetchResponse it means that it sent the FetchRequest to the leader
of the __cluster_metadata-0 topic partitions. Only the KafkaRaftClient
will send that fetch request.

After writing the above, I see what you are saying. The broker needs
to know if it should enable the KafkaRaftClient and send FetchRequests
to the __cluster_metadata-0 topic partition. I think that there is
also a question of how to perform a rolling migration of a cluster
from ZK to KIP-500. I think we will write a future KIP that documents
this process.

Thanks for your help here. For now, I'll mention that we will bump the
IBP. The new wording for the "Compatibility, Deprecation, and
Migration Plan" section:

This KIP is only implemented for the internal topic
__cluster_metadata. The inter-broker protocol (IBP) will be increased
to indicate that all of the brokers in the cluster support KIP-595 and
KIP-630.

Thanks,
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jose Garcia Sancio
Thanks for the email Guozhang.

> Thanks for the replies and the KIP updates. Just want to clarify one more
> thing regarding my previous comment 3): I understand that when a snapshot
> has completed loading, then we can use it in our handling logic of vote
> request. And I understand that:
>
> 1) Before a snapshot has been completely received (e.g. if we've only
> received a subset of the "chunks"), then we just handle vote requests "as
> like" there's no snapshot yet.
>
> 2) After a snapshot has been completely received and loaded into main
> memory, we can handle vote requests "as of" the received snapshot.
>
> What I'm wondering if, in between of these two synchronization barriers,
> after all the snapshot chunks have been received but before it has been
> completely parsed and loaded into the memory's metadata cache, if we
> received a request (note they may be handled by different threads, hence
> concurrently), what should we do? Or are you proposing that the
> fetchSnapshot request would also be handled in that single-threaded raft
> client loop so it is in order with all other requests, if that's the case
> then we do not have any concurrency issues to worry, but then on the other
> hand the reception of the last snapshot chunk and loading them to main
> memory may also take long time during which a client may not be able to
> handle any other requests.

Yes. The FetchSnapshot request and response handling will be performed
by the KafkaRaftClient in a single threaded fashion. The
KafkaRaftClient doesn't need to load the snapshot to know what state
it is in. It only needs to scan the "checkpoints" folder, load the
quorum state file and know the LEO of the replicated log. I would
modify 2) above to the following:

3) After the snapshot has been validated by
  a) Fetching all of the chunks
  b) Verifying the CRC of the records in the snapshot
  c) Atomically moving the temporary snapshot to the permanent location

After 3.c), the KafkaRaftClient only needs to scan and parse the
filenames in the directory called "checkpoints" to find the
largest/latest permanent snapshot.

As you point out in 1) before 3.c) the KafkaRaftClient, in regards to
leader election, will behave as if the temporary snapshot didn't
exists.

The loading of the snapshot will be done by the state machine (Kafka
Controller or Metadata Cache) and it can perform this on a different
thread. The KafkaRaftClient will provide an API for finding and
reading the latest valid snapshot stored locally.

Are you also concerned that the snapshot could have been corrupted after 3.c?

I also updated the "Changes to leader Election" section to make this a
bit clearer.

Thanks,
Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-10-01 Thread Jose Garcia Sancio
Thank you for the quick response Jun. Excuse the delayed response but
I wanted to confirm some things regarding IBP. See comments below.

Here are my changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=30=28

> 40. LBO: Code wise, logStartOffset is used in so many places like Log,
> ReplicaManager, LogCleaner, ReplicaFetcher, checkpoint files, etc. I am not
> if it's worth renaming in all those places. If the main concern is to
> avoid confusion, we can just always spell out logStartOffset.

Done. Keeping it as LogStartOffset is better. I was also concerned
with external tools that may be generating code from the JSON schema.

> 41. metadata.snapshot.min.cleanable.ratio: Since the snapshot could be
> empty initially, it's probably better to define the ratio as new_data_size
> / (new_data_size + snapshot_size). This avoids the dividing by zero issue
> and is also consistent with the cleaner ratio definition for compacted
> topics.

I am assuming that snapshot_size is the size of the largest snapshot
on disk, is this correct? If we use this formal then we will generate
snapshots very quickly if the snapshot on disk is zero or very small.

In general what we care about is if the replicated log has a lot of
records that delete or update records in the snapshot. I was thinking
something along the following formula:

(size of delete snapshot records + size of updated records) / (total
size of snapshot), where total size of snapshot is greater than zero.
0, where total size of snapshot is zero

This means that in the extreme case where the replicated log only
contains "addition" records then we never generate a snapshot. I think
this is the desired behavior since generating a snapshot will consume
disk bandwidth without saving disk space. What do you think?

>
> 42. FetchSnapshotRequest: Since this is designed to fetch more than one
> partition, it seems it's useful to have a per-partition maxBytes, in
> addition to the request level maxBytes, just like a Fetch request?

Yeah, we have debated this in another thread from Jason. The argument
is that MaxBytes at the top level is all that we need if we implement
the following heuristic:

1. Compute the average max bytes per partition by dividing the max by
the number of partitions in the request.
2. For all of the partitions with remaining bytes less than this
average max bytes, then send all of those bytes and sum the remaining
bytes.
3. For the remaining partitions send at most the average max bytes
plus the average remaining bytes.

Note that this heuristic will only be performed once and not at worst
N times for N partitions.

What do you think? Besides consistency with Fetch requests, is there
another reason to have MaxBytes per partition?

> 43. FetchSnapshotResponse:
> 43.1 I think the confusing part for OFFSET_OUT_OF_RANGE is
> that FetchSnapshotRequest includes EndOffset. So, OFFSET_OUT_OF_RANGE seems
> to suggest that the provided EndOffset is wrong, which is not the intention
> for the error code.

Yeah. Added a new error called POSITION_OUT_OF_RANGE.

> 43.1 Position field seems to be the same as the one in
> FetchSnapshotRequest. If we have both, should the requester verify the
> consistency between two values and what should the requester do if the two
> values don't match?

Yeah the Position in the response will be the same value as the
Position in the request. I was thinking of only verifying Position
against the state on the temporary snapshot file on disk. If Position
is not equal to the size of the file then reject the response and send
another FetchSnapshot request.

> 44. metric: Would a metric that captures the lag in offset between the last
> snapshot and the logEndOffset be useful?

Yes. How about the difference between the last snapshot offset and the
high-watermark? Snapshot can only be created up to the high-watermark.

Added this metric. Let me know if you still think we need a metric for
the difference between the largest snapshot end offset and the
high-watermark.

> 45. It seems the KIP assumes that every voter (leader and follower) and
> observer has a local replicated log for __cluster_metadata. It would be
> useful to make that clear in the overview section.

Updated the overview section. I think that this decision affects the
section "Changes to Leader Election". That section should not affect
observers since they don't participate in leader elections. It also
affects the section "Validation of Snapshot and Log" but it should be
possible to fix that section if observers don't have the replicated
log on disk.

> 46. Does this KIP cover upgrading from older versions of Kafka? If so, do
> we need IBP to guard the usage of modified FetchRequest and the new
> FetchSnapshotRequest? If not, could we make it clear that upgrading will be
> covered somewhere else?

In short, I don't think we need to increase the IBP. When we implement
snapshots for other topics like __consumer_offset and 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-29 Thread Jose Garcia Sancio
Hi Jun and Colin,

Some comments below.

> 62.3 We added some configs in KIP-595 prefixed with "quorum" and we plan to
> add some controller specific configs prefixed with "controller". KIP-630
> plans to add some other controller specific configs with no prefix. Should
> we standardize all controller specific configs with the same prefix?

I agree that consistency in all of the new properties is really
important to improve the user's experience with Kafka. After some
discussion in the review of KIP-630, the configuration names start
with "metadata".

metadata.snapshot.min.cleanable.ratio
metadata.lbo.lag.time.max.ms

The reason for the change is that this configuration affects both the
controller component and the metadata cache component as users of the
"cluster metadata" topic partition. I think the new names matches
Kafka's existing configuration pattern for the transaction and
consumer offset topic partitions.

Thanks!
-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-28 Thread Jose Garcia Sancio
Hi Guozhang,

Thanks for your feedback. It was very helpful. See my comments below.

Changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=28=27

On Sun, Sep 27, 2020 at 9:02 PM Guozhang Wang  wrote:
>
> Hello Jose,
>
> Thanks for the KIP. Overall it looks great. I have a few meta / minor
> question, or maybe just clarifications below:
>
> Meta:
>
> 1. I want to clarify that if only the active controller would generate
> snapshots, OR would any voter in the quorum would generate snapshots, OR
> would even observers generate snapshots? Originally I thought it was the
> latter case, but I think reading through the doc I got confused by some
> paragraphs. E.g. you mentioned snapshots are generated by the Controller
> module, and observers would not have that module.

Sorry for the confusion and inconsistency here. Every replica of the
cluster metadata topic partition will generate a snapshot. That
includes the voters (leader and followers) and observers. In this KIP
the leader is the Active Controller, the voters are the Kafka
Controllers and the observers are the Metadata Cache.

I went through the KIP again and made sure to enumerate both Kafka
Controllers and Metadata Cache when talking about snapshot generation
and loading.

I renamed the new configurations to be prefixed by metadata instead of
controller.

I moved the terminology section to the top.

>
> 2. Following on Jun's previous comment: currently the __consumer_metadata
> log is replicated on ALL brokers since all voters and observers would
> replicate that topic. I know this may be out of the scope of this KIP but I
> think maybe only letting the voters to replicate (and periodically
> truncate) the log while observers only maintain the in-memory state and
> snapshots is a good trade-off here, assuming snapshot loading is relatively
> fast.

This is a good idea and optimization. It would save a write. I think
we need to think about the implication to KIP-642, the dynamic quorum
reassignment KIP, if we end up allowing observers to get "promoted" to
voters.

>
> 3. When a raft client is in the middle of loading a snapshot, should it
> reject any vote / begin-/end-/describe-quorum requests at the time? More
> generally, while a snapshot is being loaded, how should we treat the
> current state of the client when handling Raft requests.

Re: requesting votes and granting votes.

In the section "Changes to Leader Election", I think this section was
improved since your review. I mentioned that the raft client needs to
look at:

1. latest/largest snapshot epoch and end offset
2. the LEO of the replicated log

The voters should use the latest/largest of these two during the
election process.

Re: quorum state

For KIP-595 and KIP-630 the snapshot doesn't include any quorum
information. This may change in KIP-642.

>
> Minor:
>
> 4."All of the live replicas (followers and observers) have replicated LBO".
> Today the raft layer does not yet maintain LBO across all replicas, is this
> information kept in the controller layer? I'm asking because I do not see
> relevant docs in KIP-631 and hence a bit confused which layer is
> responsible for bookkeeping the LBOs of all replicas.

This is not minor! :). This should be done in the raft client as part
of the fetch protocol. Note that LBO is just a rename of log start
offset. If the current raft implementation doesn't manage this
information then we will have to implement this as part of
implementing this KIP (KIP-630).

> 5. "Followers and observers will increase their log begin offset to the
> value sent on the fetch response as long as the local Kafka Controller and
> Metadata Cache has generated a snapshot with an end offset greater than or
> equal to the new log begin offset." Not sure I follow this: 1) If observers
> do not generate snapshots since they do not have a Controller module on
> them, then it is possible that observers do not have any snapshots at all
> if they do not get one from the leader, in that case they would never
> truncate the logs locally;

Observers will have a Metadata Cache which will be responsible for
generating snapshots.

> 2) could you clarify on "value sent on the fetch
> response", are you referring to the "HighWatermark", or "LogStartOffset" in
> the schema, or some other fields?

The log begin offset is the same as the log start offset. This KIP
renames that field in the fetch response. I am starting to think that
renaming this field in this KIP is not worth it. What do you think?

>
> 6. The term "SnapshotId" is introduced without definition at first. My
> understanding is it's defined as a combo of , could you
> clarify if this is the case?

Good point. I added this sentence to the Snapshot Format section and
terminology section:

"Each snapshot is uniquely identified by a SnapshotId, the epoch and
end offset of the records in the replicated log included in the
snapshot."

> BTW I think the term "endOffset" is a term
> 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-28 Thread Jose Garcia Sancio
Thanks for the reply Jun. Some comments below.

Here are the changes:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=27=26

> 20. Good point on metadata cache. I think we need to make a decision
> consistently. For example, if we decide that dedicated voter nodes don't
> serve metadata requests, then we don't need to expose the voters host/port
> to the client. Which KIP should make this decision?

Makes sense. My opinion is that this should be addressed in KIP-631
since I think exposing this information is independent of
snapshotting.

Note that I think there is a long term goal to make the
__cluster_metadata topic partition readable by a Kafka Consumer but we
can address that in a future KIP.

> 31. controller.snapshot.minimum.records: For a compacted topic, we use a
> ratio instead of the number of records to determine when to compact. This
> has some advantages. For example, if we use
> controller.snapshot.minimum.records and set it to 1000, then it will
> trigger the generation of a new snapshot when the existing snapshot is
> either 10MB or 1GB. Intuitively, the larger the snapshot, the more
> expensive it is to write to disk. So, we want to wait for more data to be
> accumulated before generating the next snapshot. The ratio based setting
> achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be
> accumulated to regenerate a 10MB/1GB snapshot respectively.

I agree. I proposed using a simple algorithm like
"controller.snapshot.minimum.records" since calculating a dirty ratio
may not be straightforward when replicated log records don't map 1:1
to snapshot records. But I think we can implement a heuristic for
this. There is a small complication when generating the first snapshot
but it should be implementable. Here is the latest wording of the
"When to Snapshot" section:

If the Kafka Controller generates a snapshot too frequently then it
can negatively affect the performance of the disk. If it doesn't
generate a snapshot often enough then it can increase the amount of
time it takes to load its state into memory and it can increase the
amount space taken up by the replicated log. The Kafka Controller will
have a new configuration option
controller.snapshot.min.cleanable.ratio.  If the number of snapshot
records that have changed (deleted or modified) between the latest
snapshot and the current in-memory state divided by the total number
of snapshot records is greater than
controller.snapshot.min.cleanable.ratio, then the Kafka Controller
will perform a new snapshot.

Note that new snapshot records don't count against this ratio. If a
new snapshot record was added since that last snapshot then it doesn't
affect the dirty ratio. If a snapshot record was added and then
modified or deleted then it counts against the dirty ratio.

> 32. max.replication.lag.ms: It seems this is specific to the metadata
> topic. Could we make that clear in the name?

Good catch. Done.


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-28 Thread Jose Garcia Sancio
Thanks Jason. Some comments below.

> > Generally the number of snapshots on disk will be one. I suspect that
> users will want some control over this. We can add a configuration
> option that doesn't delete, or advances the log begin offset past, the
> N latest snapshots. We can set the default value for this
> configuration to two. What do you think?
>
> I know Zookeeper has a config like this, but I'm not sure how frequently it
> is used. I would probably suggest we pick a good number of snapshots (maybe
> just 1-2) and leave it out of the configs.
>

Sounds good to me. If followers/observers are keeping up with the
Leader, I think the description in section "When to Increase the Log
Begin Offset" will lead to one snapshot on disk in the steady state.

> > We could use the same configuration we have for Fetch but to avoid
> confusion let's add two more configurations for
> "replica.fetch.snapshot.max.bytes" and
> "replica.fetch.snapshot.response.max.bytes".
>
> My vote would probably be to reuse the existing configs. We can add new
> configs in the future if the need emerges, but I can't think of a good
> reason why a user would want these to be different.

Sounds good to me. Removed the configuration from the KIP. Updated the
FetchSnapshot request handling section to mention that the
replica.fetch.response.max.bytes configuration will be used.

> By the way, it looks like the FetchSnapshot schema now has both a partition
> level and a top level max bytes. Do we need both?

Kepted the top level MaxBytes and remove the topic partition level MaxBytes.

> > The snapshot epoch will be used when ordering snapshots and more
> importantly when setting the LastFetchedEpoch in the Fetch request. It
> is possible for a follower to have a snapshot and an empty log. In
> this case the follower will use the epoch of the snapshot when setting
> the LastFetchEpoch in the Fetch request.
>
> Just to be clear, I think it is important to include the snapshot epoch so
> that we /can/ reason about the snapshot state in the presence of data loss.
> However, if we excluded data loss, then this would strictly speaking be
> unnecessary because a snapshot offset would always be uniquely defined
> (since we do not snapshot above the high watermark). Hence it would be safe
> to leave LastFetchedEpoch undefined. Anyway, I think we're on the same page
> about the behavior, just thought it might be useful to clarify the
> reasoning.

Okay. Even though you are correct that the LastFetchEpoch shouldn't
matter since the follower is fetching committed data. I still think
that the follower should send the epoch of the snapshot for the
LastFetchedEpoch for extra validation on the leader. What do you
think?


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-25 Thread Jose Garcia Sancio
Thanks for the detailed feedback Jun.

The changes are here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=25=24

Here is a summary of the change to the KIP:
1. Use end offset for snapshot and snapshot id.
2. Include default for all of the new configuration options.
3. Provide more detail in the response handling for FetchSnapshot

> 20. "Metadata Cache: The component that generates snapshots, reads
> snapshots and reads logs for observer replicas of the topic partition
> __cluster_metadata." It seems this is needed on every broker, not just
> observers?

Yes. I think we need some clarification and consensus here. Some
people are advocating for Kafka brokers to only be observers and would
only contain a Metadata Cache. With the Kafka Controllers being
separate nodes that are voters (follower, candidate or leader) and not
observers. Others are advocating for Kafka Brokers to be able to host
both the Kafka Controller and the Metadata Cache. In this case if the
Controller and Metadata Cache are sharing the same underlying topic
partition then we need to make sure that we unify the snapshotting
logic.

I would like to be able to unify the in-memory state for both the
Kafka Controller and the Metadata Cache so that we can share the same
replicated log and snapshot.

> 21. Our current convention is to use exclusive offset for naming
> checkpoint files. For example, a producer snapshot file of 1234.snapshot
> means that the file includes the producer state up to, but not including
> offset 1234. So, we probably want to follow the same convention for the new
> checkpoint file.

Thanks for pointing this out. This sounds good to me. This was a
detail that I was struggling with when reading the replication code.
Updated the KIP. Wherever the offset is exclusive, I renamed it to
"end offset" (EndOffset).

> 22. Snapshot Format: KIP-631 only defines the format for individual
> records. It seems that we need to define the container format here. For
> example, we need to store the length of each record. Also, does the
> snapshot file need a CRC field?

Yes. I have added more information on this. In summary, we are going
to use Kafka's log format version 2. This will give us support for
compression and CRC at the record batch level. The Kafka Controller
and Metadata Cache can control how big they want the batches to be.

> 23. Could we provide the default value for the new
> configs controller.snapshot.minimum.records and max.replication.lag.ms.
> Also, max.replication.lag.ms seems to just control the snapshot frequency
> by time and not directly relate to replication. So, maybe it should be
> called sth like controller.snapshot.minimum.interval.ms?

"max.replication.lag.ms" is very similar to "replica.lag.time.max.ms".
Kafka uses "replica.lag.time.max.ms" to make progress on the
high-watermark when replicas are slow or offline. We want to use
"max.replication.lag.ms" to make progress on the LBO when replicas are
slow or offline. These very similar names are confusing. How about
"replica.lbo.lag.time.max.ms"?

How often snapshotting will happen is determined by
"controller.snapshot.minimum.records".

> 24. "Kafka allows the clients to delete records that are less than a given
> offset by using the DeleteRecords RPC . Those requests will be validated
> using the same logic enumerated above." Hmm, should we allow deleteRecord
> on the metadata topic? If we do, does it trim the snapshot accordingly?

Yeah. After thinking about it some more, I don't think we shouldn't
allow DeleteRecords to succeed on the __cluster_metadata partition.
For the error that we return it looks like our options are the
existing "POLICY_VIOLATIOIN" (the description for this error is
"Request parameters do not satisfy the configured policy.') or
introduce a new error. I think we should just return
POLICY_VIOLATIOIN, what do you think?

> 25. "The followers of the __cluster_metadata topic partition will
> concurrently fetch the snapshot and replicated log. This means that
> candidates with incomplete snapshots will send a vote request with a
> LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the
> replicated log." My understanding is that a follower will either fetch from
> the snapshot or the log, but not both at the same time. Could you explain
> how the concurrent part works? Also, what's an incomplete snapshot?

Yes. I rewrote this section based on your comment and Jason's
comments. Let me know if this addresses your concerns.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-630:+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ChangestoLeaderElection

>
> 26. FetchRequest:
> 26.1 Handling Fetch Request: I agree with Jason that SnapshotOffsetAndEpoch
> already tells us the next offset to fetch. So, we don't need to
> set NextOffsetAndEpoch in the response.

Agreed. The response will set one or the other. If SnapshotId (field
renamed in the latest version of the KIP) is set then the 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-09-24 Thread Jose Garcia Sancio
Thanks for the feedback Jason.

I have made the following changes to the KIP:
1. Better explanation of how followers will manage snapshots and the
replicated log. This includes the necessary changes when granting or
requesting votes.
2. How the Snapshot's epoch will be used for the LastFetchEpoch in the
Fetch request.
3. New configuration options.
4. Changed the Fetch response to match the latest changes in KIP-595.
5. Changed the FetchSnapshot request to include total response max bytes.
6. Changed the FetchSnapshot response to return the snapshot size
instead of the "Continue" field.

Diff:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=24=23

> 1. There is a comment in the proposal which suggests that we will maintain
> multiple snapshots:
>
> > Having multiple snapshots is useful for minimizing re-fetching of the
> snapshot when a new snapshot is generated.
>
> However, the document later says that snapshots get deleted as the LBO
> advances. Just wanted to clarify the intent. Will we generally only have
> one snapshot?

Generally the number of snapshots on disk will be one. I suspect that
users will want some control over this. We can add a configuration
option that doesn't delete, or advances the log begin offset past, the
N latest snapshots. We can set the default value for this
configuration to two. What do you think?

> 2. The proposal says the following:
>
> > During leader election, followers with incomplete or missing snapshot
> will send a vote request and response as if they had an empty log.
>
> Maybe you can help me understand the scenario we're talking about since I'm
> not sure I understand the point of this. If the intent is to not allow such
> a follower to become leader, why would it ever become a candidate? On the
> other hand, if the intent is to still allow it to become leader in some
> disaster scenario, then why would it not use its latest log state? For
> inbound Vote requests, I think it should definitely still consider its
> latest log state when deciding whether to grant a vote.

Conceptually followers will implement this algorithm:
1. Follower sends fetch request
2. Leader replies with snapshot epoch and offset
3. Follower pauses fetch
4. Follower fetches the snapshot
5. Follower resume fetch by
A. Setting the LBO to the snapshot offset plus one
B. Setting the LEO or fetch offset in the fetch request to the
snapshot offset plus one
C. Uses the snapshot epoch as the last fetched epoch in the fetch request.

The problem I was trying to address is what is the state of the
follower between bullet 4 and 5? Let's assume that the snapshot fetch
in bullet 4 has an epoch of E and an offset of O. The follower can
have the following state on disk after bullet 4:

1. A snapshot with offset O and epoch E.
2. Many snapshots older/less than offset O and epoch E.
3. A replicated log with LEO older/less than offset O and epoch E.

In this case when the follower grants a vote or becomes a candidate it
should use the latest of all of this which is (1.) the fetched
snapshot with offset O and epoch E.

I updated the KIP to include this description.

> 3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as
> well? It looks like we are specifying this at the partition level, but it
> might be more useful to track the maximum bytes at the request level. On a
> related note, it might be useful to think through heuristics for balancing
> between the requests in a partition. Unlike fetches, it seems like we'd
> want to complete snapshot loading partition by partition. I wonder if it
> would be simpler for FetchSnapshot to handle just one partition.

We could use the same configuration we have for Fetch but to avoid
confusion let's add two more configurations for
"replica.fetch.snapshot.max.bytes" and
"replica.fetch.snapshot.response.max.bytes".

> 4. It would help if the document motivated the need to track the snapshot
> epoch. Since we are only snapshotting below the high watermark, are you
> thinking about recovering from data loss scenarios?

I added the following paragraph to the KIP:

The snapshot epoch will be used when ordering snapshots and more
importantly when setting the LastFetchedEpoch in the Fetch request. It
is possible for a follower to have a snapshot and an empty log. In
this case the follower will use the epoch of the snapshot when setting
the LastFetchEpoch in the Fetch request.

>
> 5. Might need to fix the following:
>
> > Otherwise, the leader will respond with the offset and epoch of the
> latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d)
>
> We ended up renaming the next fetch offset and epoch. I think we should
> just leave it empty in this case. The snapshot offset and epoch seem
> sufficient.

Done. I made some changes to the "Handling Fetch Response" section too.


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-07 Thread Jose Garcia Sancio
Hi Unmesh,

Very cool prototype!

Hi Colin,

The KIP proposes a record called IsrChange which includes the
partition, topic, isr, leader and leader epoch. During normal
operation ISR changes do not result in leader changes. Similarly,
leader changes do not necessarily involve ISR changes. The controller
implementation that uses ZK modeled them together because
1. All of this information is stored in one znode.
2. ZK's optimistic lock requires that you specify the new value completely
3. The change to that znode was being performed by both the controller
and the leader.

None of these reasons are true in KIP-500. Have we considered having
two different records? For example

1. IsrChange record which includes topic, partition, isr
2. LeaderChange record which includes topic, partition, leader and leader epoch.

I suspect that making this change will also require changing the
message AlterIsrRequest introduced in KIP-497: Add inter-broker API to
alter ISR.

Thanks
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-07 Thread Jose Garcia Sancio
Thanks for your feedback Jun.

Here are my changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=21=20

My comments are below...

On Wed, Aug 5, 2020 at 1:59 PM Jun Rao  wrote:
>
> Hi, Jose,
>
> Thanks for the KIP. A few comments blow.
>
> 10. I agree with Jason that it's useful to document the motivation a bit
> clearer. Regarding semantic/performance, one benefit of snapshotting is
> that it allows changes to be encoded incrementally instead of using the
> full post image. For example, in KIP-631, each partition has multiple
> fields like assigned replicas, leader, epoch, isr, etc. If only isr is
> changed, the snapshotting approach allows the change to be represented with
> just the new value in isr. Compaction will require all existing fields to
> be included in order to represent just an isr change. This is just because
> we can customize the combining logic with snapshotting.
>

Yes. Right now the IsrChange record from KIP-631 has both the ISR and
the leader and epoch. I think we can split this record into two
records:
1. ISR change that includes the topic, partition, and isr.
2. Leader change that includes the topic, partition, leader and leader epoch.
I'll bring this up in the discussion thread for that KIP.

> As for the
> performance benefit, I guess in theory snapshotting allows the snapshot to
> be updated in-place incrementally without having to read the full state in
> the snapshot. BTW, during compaction, we only read the cleaned data once
> instead of 3 times.
>

Doesn't compaction need to read the clean records to compare if the
key is in the map of keys to offset? I made the following changes to
the KIP:

2. With log compaction the broker needs to
  a. read 1MB/s from the head of the log to update the in-memory state
  b. read 1MB/s to update the map of keys to offsets
  c. read 3MB/s (100MB from the already compacted log, 50MB from the
new key-value records) from the older segments. The log will
accumulate 50MB in 50 seconds worth of changes before compacting
because the default configuration has a minimum clean ratio of 50%.

The "100MB in the already compacted log" are these cleaned records.
Let me know what you think and if I am missing something.

> 11. The KIP mentions topic id. Currently there is no topic id. Does this
> KIP depend on KIP-516?
>

For the purpose of measuring the impact, I was using the records
proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on
its design and implementation. I was just referencing that KIP in the
motivation and analysis. The KIP only assumes the changes in KIP-595
which has been approved but it are not part of trunk yet.

In the overview section the KIP mentions: "This KIP assumes that
KIP-595 has been approved and implemented. "

> 12. Is there a need to keep more than 1 snapshot? It seems we always expose
> the latest snapshot to clients.
>

The KIP proposes keeping more than one snapshot to not invalidate any
pending/concurrent `FetchSnapshot` that are attempting to fetch a
snapshot that can be deleted. I'll remove this wording as the first
version of this implementation will probably won't have this feature
as it requires extra coordination. The implementation will still allow
for multiple snapshots because generating a snapshot is not atomic
with respect to increasing the LBO.


> 13. "During leader election, followers with incomplete or missing snapshot
> will send a vote request and response as if they had an empty log." Hmm, a
> follower may not have a snapshot created, but that doesn't imply its log is
> empty.
>

Yes. I fixed the "Validation of Snapshot and Log"  and that sentence.
I basically added an additional condition where a snapshot is not
required if the LBO is 0.

> 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do we
> compare an offset to a time?
>

Yeah. This may be hard to implement. I am trying to avoid invalidating
followers and observers by aggressively deleting an offset/record
which they are trying to fetch. It is possible that
`controller.snapshot.minimum.records` is good enough to throttle
increasing LBO.

> 15. "Followers and observers will increase their log begin offset to the
> value sent on the fetch response as long as the local state machine has
> generated a snapshot that includes to the log begin offset minus one." Does
> the observer store the log? I thought it only needed to maintain a
> snapshot. If so, the observer doesn't need to maintain LBO.
>

In KIP-595 observers are similar to voters/followers in that they
Fetch the log and the snapshot. Two of the distinctions are that they
don't participate in the leader election and they are not included
when computing the high-watermark. Regarding storing: it is possible
that observers never need to store the log since they don't vote or
become leaders. I think in the future we would like to implement
"KIP-642: Dynamic quorum reassignment" which would add the capability
to 

Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-05 Thread Jose Garcia Sancio
Once again, thanks for the feedback Jason,

My changes to the KIP are here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=18=17

And see my comments below...

On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson  wrote:
>
> Hi Jose,
>
> Thanks for the proposal. I think there are three main motivations for
> snapshotting over the existing compaction semantics.
>
> First we are arguing that compaction is a poor semantic fit for how we want
> to model the metadata in the cluster. We are trying to view the changes in
> the cluster as a stream of events, not necessarily as a stream of key/value
> updates. The reason this is useful is that a single event may correspond to
> a set of key/value updates. We don't need to delete each partition
> individually for example if we are deleting the full topic. Outside of
> deletion, however, the benefits of this approach are less obvious. I am
> wondering if there are other cases where the event-based approach has some
> benefit?
>

Yes. Another example of this is what KIP-631 calls FenceBroker. In the
current implementation of the Kafka Controller and the implementation
proposed in
KIP-631, whenever a broker is fenced the controller removes the broker
from the ISR and performs leader election if necessary. The impact of
this operation on replication is documented in section "Amount of Data
Replicated". I have also updated the KIP to reflect this.

> The second motivation is from the perspective of consistency. Basically we
> don't like the existing solution for the tombstone deletion problem, which
> is just to add a delay before removal. The case we are concerned about
> requires a replica to fetch up to a specific offset and then stall for a
> time which is longer than the deletion retention timeout. If this happens,
> then the replica might not see the tombstone, which would lead to an
> inconsistent state. I think we are already talking about a rare case, but I
> wonder if there are simple ways to tighten it further. For the sake of
> argument, what if we had the replica start over from the beginning whenever
> there is a replication delay which is longer than tombstone retention time?
> Just want to be sure we're not missing any simple/pragmatic solutions
> here...
>

We explore the changes needed to log compaction and the fetch protocol
such that it results in a consistent replicated log in the rejected
sections. I changed the KIP to also mention it in the motivation
section by adding a section called "Consistent Log and Tombstones"

> Finally, I think we are arguing that compaction gives a poor performance
> tradeoff when the state is already in memory. It requires us to read and
> replay all of the changes even though we already know the end result. One
> way to think about it is that compaction works O(the rate of changes) while
> snapshotting is O(the size of data). Contrarily, the nice thing about
> compaction is that it works irrespective of the size of the data, which
> makes it a better fit for user partitions. I feel like this might be an
> argument we can make empirically or at least with back-of-the-napkin
> calculations. If we assume a fixed size of data and a certain rate of
> change, then what are the respective costs of snapshotting vs compaction? I
> think compaction fares worse as the rate of change increases. In the case
> of __consumer_offsets, which sometimes has to support a very high rate of
> offset commits, I think snapshotting would be a great tradeoff to reduce
> load time on coordinator failover. The rate of change for metadata on the
> other hand might not be as high, though it can be very bursty.
>

This is a very good observation. If you assume that the number of keys
doesn't change but that we have frequent updates to its values then I
think that after log compaction the size of the compacted section of
the log is O(size of the data) + O(size of the tombstones). And as you
point out the size of the snapshot is also O(size of the data). I
think this is a reasonable assumption for topics like
__cluster_metadata and __consumer_offsets.

The difference is the number of reads required. With in-memory
snapshot we only need to read the log once. With log compaction we
need to read the log 3 times: 1. to update the in-memory state, 2.
generate the map of key to offset and 3. compact the log using the map
of keys to offset. I have updated the KIP and go into a lot more
details in section "Loading State and Frequency of Compaction

-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-08-04 Thread Jose Garcia Sancio
Thanks for your feedback Jason. I'll have a more detailed reply and
update to the KIP by EOD today.

On Mon, Aug 3, 2020 at 1:57 PM Jason Gustafson  wrote:
>
> Hi Jose,
>
> Thanks for the proposal. I think there are three main motivations for
> snapshotting over the existing compaction semantics.
>
> First we are arguing that compaction is a poor semantic fit for how we want
> to model the metadata in the cluster. We are trying to view the changes in
> the cluster as a stream of events, not necessarily as a stream of key/value
> updates. The reason this is useful is that a single event may correspond to
> a set of key/value updates. We don't need to delete each partition
> individually for example if we are deleting the full topic. Outside of
> deletion, however, the benefits of this approach are less obvious. I am
> wondering if there are other cases where the event-based approach has some
> benefit?
>
> The second motivation is from the perspective of consistency. Basically we
> don't like the existing solution for the tombstone deletion problem, which
> is just to add a delay before removal. The case we are concerned about
> requires a replica to fetch up to a specific offset and then stall for a
> time which is longer than the deletion retention timeout. If this happens,
> then the replica might not see the tombstone, which would lead to an
> inconsistent state. I think we are already talking about a rare case, but I
> wonder if there are simple ways to tighten it further. For the sake of
> argument, what if we had the replica start over from the beginning whenever
> there is a replication delay which is longer than tombstone retention time?
> Just want to be sure we're not missing any simple/pragmatic solutions
> here...
>
> Finally, I think we are arguing that compaction gives a poor performance
> tradeoff when the state is already in memory. It requires us to read and
> replay all of the changes even though we already know the end result. One
> way to think about it is that compaction works O(the rate of changes) while
> snapshotting is O(the size of data). Contrarily, the nice thing about
> compaction is that it works irrespective of the size of the data, which
> makes it a better fit for user partitions. I feel like this might be an
> argument we can make empirically or at least with back-of-the-napkin
> calculations. If we assume a fixed size of data and a certain rate of
> change, then what are the respective costs of snapshotting vs compaction? I
> think compaction fares worse as the rate of change increases. In the case
> of __consumer_offsets, which sometimes has to support a very high rate of
> offset commits, I think snapshotting would be a great tradeoff to reduce
> load time on coordinator failover. The rate of change for metadata on the
> other hand might not be as high, though it can be very bursty.
>
> Thanks,
> Jason
>
>
> On Wed, Jul 29, 2020 at 2:03 PM Jose Garcia Sancio 
> wrote:
>
> > Thanks Ron for the additional comments and suggestions.
> >
> > Here are the changes to the KIP:
> >
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15
> >
> > On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino  wrote:
> > >
> > > Thanks, Jose.  It's looking good.  Here is one minor correction:
> > >
> > > <<< If the Kafka topic partition leader receives a fetch request with an
> > > offset and epoch greater than or equal to the LBO (x + 1, a)
> > > >>> If the Kafka topic partition leader receives a fetch request with an
> > > offset and epoch greater than or equal to the LBO (x + 1, b)
> > >
> >
> > Done.
> >
> > > Here is one more question.  Is there an ability to evolve the snapshot
> > > format over time, and if so, how is that managed for upgrades? It would
> > be
> > > both Controllers and Brokers that would depend on the format, correct?
> > > Those could be the same thing if the controller was running inside the
> > > broker JVM, but that is an option rather than a requirement, I think.
> > > Might the Controller upgrade have to be coordinated with the broker
> > upgrade
> > > in the separate-JVM case?  Perhaps a section discussing this would be
> > > appropriate?
> > >
> >
> > The content set though the FetchSnapshot RPC is expected to be
> > compatible with future changes. In KIP-631 the Kafka Controller is
> > going to use the existing Kafka Message and versioning scheme.
> > Specifically see section "Record Format Versions". I added some
> > wording around this.
> >
> > Thanks!
> > -Jose
> >



-- 
-Jose


Re: [VOTE] KIP-595: A Raft Protocol for the Metadata Quorum

2020-08-03 Thread Jose Garcia Sancio
+1.

Thanks for the detailed KIP!

On Mon, Aug 3, 2020 at 11:03 AM Jason Gustafson  wrote:
>
> Hi All, I'd like to start a vote on this proposal:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum.
> The discussion has been active for a bit more than 3 months and I think the
> main points have been addressed. We have also moved some of the pieces into
> follow-up proposals, such as KIP-630.
>
> Please keep in mind that the details are bound to change as all of
> the pieces start coming together. As usual, we will keep this thread
> notified of such changes.
>
> For me personally, this is super exciting since we have been thinking about
> this work ever since I started working on Kafka! I am +1 of course.
>
> Best,
> Jason



-- 
-Jose


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-03 Thread Jose Garcia Sancio
Thanks for the KIP Colin,

Here is a partial review:

1.
> Even when a broker and a controller are co-located in the same JVM, they must 
> have different node IDs

Why? What problem are you trying to solve?

2.
> Node IDs must be set in the configuration file for brokers and controllers.

I understand that controller IDs must be static and in the
configuration file to be able to generate consensus in KIP-595. Why
are the broker nodes which are observers in KIP-595 cannot discover
their ID on first boot and persist their ID for consistency in future
restarts?

3.
> Controller processes will listen on a separate endpoint from brokers

Why is this? Kafka supports multi endpoints. For example, one broker
can have one endpoint for listening to connections by other brokers
and another endpoint for connections from admin, producer and consumer
clients.

4.
> In the case of controller RPCs like AlterIsr, the controller handles this by 
> not sending back a response until the designated change has been persisted.

Should we enumerate these RPCs? For example, we also have
`ListPartitionReassignments` which is a read operation and goes
directly to the controller. The naive solution would be to return the
uncommitted state in the controller.

5.
This KIP mentions a topic named __kafka_metadata. KIP-595 and KIP-630
mention a partition named __cluster_metadata. We should reconcile this
difference.

--
-Jose


Re: [DISCUSS] KIP-649: Dynamic Client Configuration

2020-07-31 Thread Jose Garcia Sancio
Thanks for the KIP Ryan. Here are some of my observations.

1.
> We will also be adding a new client configuration enable.dynamic.config to 
> both the producer and consumer that will be true by default so that the user 
> has the option to disable this feature.

How about?
"The Java producer and consumer clients will have a new configuration
property `enable.dynamic.config` with a default value of `true`. When
this configuration property is true the proposed producer and consumer
changes in this KIP are enabled."

You can go into more details in the producer and consumer changes section.

2.
> ... the following order of precedence:

I would be a bit more specific here. For example, when the broker
handles `DescribeConfigRequest`, it will first use the client config
key-values store in `/config/clients/`. If there is a
matching `config/clients/` then any client config key-value
in the matching znode will override the key-values found in
`config/clients/`.

3.
> These dynamic configs will be stored in zookeeper as the children of the 
> ZNode /config/clients

How are the client dynamic config key-values stored here? I assume
that they are updated based on the content of
`IncrementalAlterConfigsRequest`. When is `.../` updated?
When is `.../` updated?

4.
> The interval on which dynamic configs are fetched will be the same amount of 
> time as the interval for MetadataRequest which is currently five minutes.

Will this be hard-coded to 5 minutes? Or is this KIP going to use the
same frequency as the producer config `metadata.max.age.ms`? Same
question for the "Consumer Changes" section.

5.
The Consumer Changes section mentions that the consumer would ask for
the dynamic configuration from the broker before joining the group
coordinator. This makes sense to me. How about the producer? Should
the producer also describe the dynamic configuration before sending
acks for the "produce" messages?

6.
For the Admin Client Changes section, how are DescribeConfigs and
IncrementalAlterConfig requests going to get routed by the client to
the different brokers in the cluster?

7.
You mentioned that the producer and the consumer will validate the
keys and values received from the broker through DescribeConfigs. Will
the ConfigCommand validate any of the keys or values specified in
--add-config and --delete-config? Will the broker validate any of the
keys or values received in the IncrementalAlterConfigs?

8.
In rejected ideas the KIP says:
> This might make sense for certain configurations such as acks, but does not 
> for others such as timeouts.

I don't think it makes sense even for acks since the clients of the
Java Producer assume that all of the produce messages are sent with
the same ack value.

-- 
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-29 Thread Jose Garcia Sancio
Thanks Ron for the additional comments and suggestions.

Here are the changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=17=15

On Wed, Jul 29, 2020 at 8:44 AM Ron Dagostino  wrote:
>
> Thanks, Jose.  It's looking good.  Here is one minor correction:
>
> <<< If the Kafka topic partition leader receives a fetch request with an
> offset and epoch greater than or equal to the LBO (x + 1, a)
> >>> If the Kafka topic partition leader receives a fetch request with an
> offset and epoch greater than or equal to the LBO (x + 1, b)
>

Done.

> Here is one more question.  Is there an ability to evolve the snapshot
> format over time, and if so, how is that managed for upgrades? It would be
> both Controllers and Brokers that would depend on the format, correct?
> Those could be the same thing if the controller was running inside the
> broker JVM, but that is an option rather than a requirement, I think.
> Might the Controller upgrade have to be coordinated with the broker upgrade
> in the separate-JVM case?  Perhaps a section discussing this would be
> appropriate?
>

The content set though the FetchSnapshot RPC is expected to be
compatible with future changes. In KIP-631 the Kafka Controller is
going to use the existing Kafka Message and versioning scheme.
Specifically see section "Record Format Versions". I added some
wording around this.

Thanks!
-Jose


Re: [DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-28 Thread Jose Garcia Sancio
Thanks Ron. Your comments and suggestions were helpful. You can see my
changes to the KIP here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763=15=14

My comments are below...

On Mon, Jul 27, 2020 at 11:29 AM Ron Dagostino  wrote:
>
> Hi Jose.  Thanks for the KIP.  Here are some questions and some nit 
> corrections.
>
> <<< In KIP-500 the Kafka Controller, which is the quorum leader from
> KIP-595, will materialize the entries in the metadata log into memory.
> Technically I think the quorum leader is referred to as the Active
> Controller in KIP-500.  Maybe replace "Kafka Controller" with "Active
> Controller"?  I think the term "Kafka Controller" is fine as used
> throughout the rest of the KIP to refer to the entire thing, but when
> referring specifically to the leader I think "Active Controller" is
> the term that is defined in KIP-500.
>

Made those changes.

>
> <<< Each broker in KIP-500, which will be a replica of the metadata
> log, will materialize the entries in the log into a metadata cache
> This wording confused me because I assumed that "replica" was a formal
> term and only (non-Active) Controllers are formally "replicas" of the
> metadata log -- Kafka brokers would be clients that read the log and
> then use the data for their own purpose as opposed to formally being
> replicas with this understanding of the term "replica".  Is that
> correct, and if so, maybe replace "replica" with "client"?
>

In KIP-595 we have two types of replicas: voters and observers. Voter
replicas are Kafka Controllers and one one of them will become the
Active controller. Observer replicas fetch from the log and attempt to
keep up with the LEO of the Active Controller. I think you can
consider all of them as "client" of the replicated log.

>
> <<< The type of in-memory state machines what we plan to implement
> >>> The type of in-memory state machines that we plan to implement
> nit
>

Done.

>
> <<< doesn't map very well to an key and offset based clean up policy.
> >>> doesn't map very well to a key and offset based clean up policy.
> nit
>

Done.

>
> <<< When starting a broker either because it is a new broker, a broker
> was upgraded or a failed broker is restarting. Loading the state
> represented by the __cluster_metadata topic partition is required
> before the broker is available
> >>> When starting a broker either because it is a new broker or it is 
> >>> restarting, loading the state represented by the __cluster_metadata topic 
> >>> partition is required before the broker is available.
> Reword for simplicity and clarity?
>

Done.

>
> <<< With snapshot based of the in-memory state Kafka can be much more 
> aggressive
> >>> By taking and transmitting a snapshot of the in-memory state as described 
> >>> below Kafka can be much more aggressive
> Tough to refer to the concept of snapshot here without having
> described what it is, so refer to "as described below" to help orient
> the reader?
>

Made some changes to these sentences. I agree that fully understanding
parts of the motivated section requires reading the rest of the
document. I wanted to make sure that we had this in the motivation
section.

>
> <<< In the future this mechanism will also be useful for
> high-throughput topic partitions like the Group Coordinator and
> Transaction Coordinator.
> >>> In the future this mechanism may also be useful for high-throughput topic 
> >>> partitions like the Group Coordinator and Transaction Coordinator.
> Tough to say "will" when that is an assumption that would depend on a KIP?
>

Yeah. Changed it.

>
> << __cluster_metadata topic partition then the ... Kafka Controller will
> need to replicate 3.81 MB to each broker in the cluster (10) or 38.14
> MB.
> It might be good to append a sentence that explicitly states how much
> data is replicated for the delta/event -- right now it is implied to
> be very small, but that's kind of like leaving the punch line to a
> joke implied :-)
>

Thanks. I updated the example and added numbers for the events/deltas case.


>
> <<< Follower and observer replicas fetch the snapshots from the leader
> they attempt to fetch an offset from the leader and the leader doesn’t
> have that offset in the log
> >>> Follower and observer replicas fetch a snapshot from the leader when they 
> >>> attempt to fetch an offset from the leader and the leader doesn’t have 
> >>> that offset in the log
> nit
>

Done.

>
> >>> Generating and loading the snapshot will be delegated to the Kafka 
> >>> Controller.
> >>> The Kafka Controller will notify the Kafka Raft client when it has 
> >>> generated a snapshot and up to which offset is included in the snapshot.
> >>> The Kafka Raft client will notify the Kafka Controller when a new 
> >>> snapshot has been fetched from the leader.
> This paragraph confuses me.  What is the "Kafka Raft client" -- is
> this the broker? Or is it some other subsystem (or all other
> subsystems aside from log 

[DISCUSS] KIP-630: Kafka Raft Snapshot

2020-07-26 Thread Jose Garcia Sancio
Hi All,

I would like to start a discussion on KIP-630:
https://cwiki.apache.org/confluence/x/exV4CQ

This proposal specifies extensions to KIP-595 to support generating
snapshots for the replicated log. Please let me know if you have any
comments and suggestions.

Thanks!
-- 
-Jose


Re: [VOTE] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-06-17 Thread Jose Garcia Sancio
+1.

Thanks for the KIP and looking forward to the improvement implementation.

On Wed, Jun 17, 2020 at 2:24 PM Guozhang Wang  wrote:
>
> Thanks for the KIP Boyang, +1 from me.
>
>
> Guozhang
>
> On Wed, Jun 17, 2020 at 1:40 PM Colin McCabe  wrote:
>
> > Thanks, Boyang!  +1 (binding)
> >
> > best,
> > Colin
> >
> > On Mon, Jun 15, 2020, at 12:59, Boyang Chen wrote:
> > > Thanks for more feedback Colin! I have addressed them in the KIP.
> > >
> > > Boyang
> > >
> > > On Mon, Jun 15, 2020 at 11:29 AM Colin McCabe 
> > wrote:
> > >
> > > > On Fri, Jun 12, 2020, at 15:30, Boyang Chen wrote:
> > > > > Thanks Colin for the suggestions!
> > > > >
> > > > > On Fri, Jun 12, 2020 at 2:40 PM Colin McCabe 
> > wrote:
> > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > Thanks for the KIP!  I think it's getting close.
> > > > > >
> > > > > >  > For older requests that need redirection, forwarding
> > > > > >  > broker will just use its own authorizer to verify the
> > principals.
> > > > When
> > > > > > the
> > > > > >  > request looks good, it will just forward the request with its
> > own
> > > > > >  > credentials, no second validation needed
> > > > > >
> > > > > > Just to be clear, the controller will still validate the request,
> > > > right?
> > > > > > But at that point the principal will be the broker principal.  It
> > > > would be
> > > > > > good to note that here.
> > > > > >
> > > > > > Sounds good, cleared in the KIP.
> > > > >
> > > > > > Internal CreateTopicsRequest Routing
> > > > > >
> > > > > > The forwarding broker is sending the request as the latest version,
> > > > > > right?  It would be good to add a note of this.  This also prevents
> > > > routing
> > > > > > loops since the latest version is not forwardable (another good
> > thing
> > > > to
> > > > > > add, I think...)
> > > > > >
> > > > > We are not bumping the CreateTopic RPC here, so it should be the
> > latest
> > > > > by default.
> > > > >
> > > >
> > > > Sorry, CreateTopics was a bad example here, since it already must be
> > sent
> > > > to the controller.  Oops.
> > > >
> > > > >
> > > > > And just to be clear, we are not "forwarding" but actually
> > > > > sending a CreateTopicRequest from the receiving broker to the
> > controller
> > > > > broker.
> > > > >
> > > >
> > > > Right.  I think we agree on this point.  But we do need a term to
> > describe
> > > > the broker which initially receives the user request and resends it to
> > the
> > > > controller.  Resending broker?
> > > >
> > > > And I do think it's important to note that the request we send to the
> > > > controller can't be itself resent.
> > > >
> > > > >
> > > > >  > As we discussed in the request routing section, to work with an
> > older
> > > > > >  > client, the first contacted broker need to act as a proxy to
> > > > redirect
> > > > > > the
> > > > > >  > write request to the controller. To support the proxy of
> > requests,
> > > > we
> > > > > > need
> > > > > >  > to build a channel for brokers to talk directly to the
> > controller.
> > > > This
> > > > > >  > part of the design is internal change only and won’t block the
> > KIP
> > > > > >  > progress.
> > > > > >
> > > > > > I think it's good to note that we eventually want a separate
> > controller
> > > > > > endpoint in KIP-500.  However, we don't need it to implement
> > KIP-590,
> > > > > > right?  The other brokers could forward to the existing internal
> > > > endpoint
> > > > > > for the controller.  So maybe it's best to discuss the separate
> > > > endpoint in
> > > > > > "future work" rather than here.
> > > > > >
> > > > > > I moved the new endpoint part towards the future work and
> > addressed the
> > > > > > usage of controller internal endpoint for routing requests.
> > > > >
> > > >
> > > > Thanks.
> > > >
> > > > >
> > > > > > > === Start Old Proposal  ===
> > > > > >
> > > > > > I'm glad the old proposal shows up here, but I think this is too
> > much
> > > > > > detail.  It would be better to just have a one or two paragraph
> > > > summary of
> > > > > > the main points.  As it is, the old proposal takes up 40% of the
> > doc
> > > > which
> > > > > > is pretty confusing for someone reading through.  Let's also not
> > forget
> > > > > > that someone can just read the old version by using the "page
> > history"
> > > > > > function on the wiki.  So there's no need to keep that all here.
> > > > > >
> > > > > > Make sense, removed.
> > > > >
> > > >
> > > > Thanks again.
> > > >
> > > > >
> > > > >{ "name": "PrincipalName", "type": "string", "tag": 0,
> > > > "taggedVersions": "2+", "ignorable": true,
> > > > >  "about": "Optional value of the principal name when the request
> > is
> > > > redirected by a broker." },
> > > > >
> > > >
> > > > Maybe "InitialPrincipalName" would be better here?  PrincipalName is a
> > bit
> > > > confusing since the message already has a principal name, after all...
> > > >
> > > > cheers,
> > > > Colin
> > > >

Re: [VOTE] KIP-589: Add API to update Replica state in Controller

2020-05-21 Thread Jose Garcia Sancio
+1. LGTM David!

On Wed, May 20, 2020 at 12:22 PM David Arthur  wrote:
>
> Hello, all. I'd like to start the vote for KIP-589 which proposes to add a
> new AlterReplicaState RPC.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller
>
> Cheers,
> David



-- 
-Jose


Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-07 Thread Jose Garcia Sancio
Cheng,

Thanks for the KIP and the detailed proposal section. LGTM!

On Thu, May 7, 2020 at 3:38 PM Cheng Tan  wrote:
>
> I think more about the potential wider use cases, I modified the proposal to 
> target all the connection. Thanks.
>
> - Best, - Cheng Tan
>
> > On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
> >
> > Hi Colin,
> >
> > Sorry for the confusion. I’m proposing to implement timeout in the 
> > NetworkClient.leastLoadedNode() when iterating all the cached node. The 
> > alternative I can think is to implement the timeout in NetworkClient.poll()
> >
> > I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
> > Usually when clients send a request, they will asking the network client to 
> > send the request to a specific node. In this case, the 
> > connection.setup.timeout won’t matter too much because the client doesn’t 
> > want to try other nodes for that specific request. The request level 
> > timeout would be enough. The metadata fetcher fetches the nodes status 
> > periodically so the clients can reassign the request to another node after 
> > timeout.
> > Consumer, producer, and AdminClient are all using leastLoadedNode() for 
> > metadata fetch, where the connection setup timeout can play an important 
> > role. Unlike other requests can refer to the metadata for node condition, 
> > the metadata requests can only blindly choose a node for retry in the worst 
> > scenario. We want to make sure the client can get the metadata smoothly and 
> > as soon as possible. As a result, we need this connection.setup.timeout.
> > Implementing the timeout in poll() or anywhere else might need an extra 
> > iteration of all nodes, which might downgrade the network client 
> > performance.
> > I also updated the KIP content and KIP status. Please let me know if the 
> > above ideas make sense. Thanks.
> >
> > Best, - Cheng Tan
> >
> >
> >
> >> On May 4, 2020, at 5:26 PM, Colin McCabe  >> <mailto:cmcc...@apache.org>> wrote:
> >>
> >> Hi Cheng,
> >>
> >> On the KIP page, it lists this KIP as "draft."  It seems like "under 
> >> discussion" is appropriate here, right?
> >>
> >>> Currently, the initial socket connection timeout is depending on Linux 
> >>> kernel setting
> >>> tcp_syn_retries. The timeout value is 2 ^ (tcp_sync_retries + 1) - 1 
> >>> seconds. For the
> >>> reasons below, we want to control the client-side socket timeout directly 
> >>> using
> >>> configuration files
> >>
> >> Linux is just one example of an OS that Kafka could run on, right?  You 
> >> could also be running on MacOS, for example.
> >>
> >>> I'm proposing to do a lazy socket connection time out. That is, we only 
> >>> check if
> >>> we need to timeout a socket when we consider the corresponding node as a
> >>> candidate in the node provider.
> >>
> >> The NodeProvider is an AdminClient abstraction, right?  Why wouldn't we 
> >> implement a connection setup timeout for all clients, not just AdminClient?
> >>
> >> best,
> >> Colin
> >>
> >> On Mon, May 4, 2020, at 13:18, Colin McCabe wrote:
> >>> Hmm.  A big part of the reason behind the KIP is that the default
> >>> connection timeout behavior of the OS doesn't work for Kafka, right?
> >>> For example, on Linux, if we wait 127 seconds for a connection attempt
> >>> to time out, we won't get a chance to make another attempt in most
> >>> cases.  So I think it makes sense to set a shorter default.
> >>>
> >>> best,
> >>> Colin
> >>>
> >>>
> >>> On Mon, May 4, 2020, at 09:44, Jose Garcia Sancio wrote:
> >>>> Thanks for the KIP Cheng,
> >>>>
> >>>>> The default value will be 10 seconds.
> >>>>
> >>>> I think we should make the default the current behavior. Meaning the
> >>>> default should leverage the default connect timeout from the operating
> >>>> system.
> >>>>
> >>>>> Proposed Changes
> >>>>
> >>>> I don't fully understand this section. It seems like it is mainly
> >>>> focused on the problem with the current implementation. Can you
> >>>> explain how the proposed changes solve the problem?
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>>> --
> >>>> -Jose
> >>>>
> >>>
> >
>


-- 
-Jose


Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-04 Thread Jose Garcia Sancio
Thanks for the KIP Cheng,

> The default value will be 10 seconds.

I think we should make the default the current behavior. Meaning the
default should leverage the default connect timeout from the operating
system.

> Proposed Changes

I don't fully understand this section. It seems like it is mainly
focused on the problem with the current implementation. Can you
explain how the proposed changes solve the problem?

Thanks.


-- 
-Jose


Re: [DISCUSS] KIP-589 Add API to Update Replica State in Controller

2020-04-16 Thread Jose Garcia Sancio
Hi David,

Thanks for the KIP.

> ReplicaStateEventResponse => ErrorCode [Topic [PartitionId]]
>ErrorCode => Int32
>Topic => String
>PartitionId => Int32
> ...
> Partition-level errors:

Based on my understanding of the response, it doesn't look like the
controller has a way of encoding these partition-level errors.

> INVALID_REQUEST: The update was rejected due to some internal inconsistency 
> (e.g. invalid replicas specified in the ISR)

What do you mean by "invalid replicas specified in the ISR"? There is
no reference to ISR in the request. Related to this, can you mention
how the broker/replica will handle the errors included in the
ReplicaStatusEventResponse?

> In this proposal, we now include the broker ID and epoch in the request, so 
> the controller can safely update its internal replica state based on the 
> request data.
> ...
> If the controller reads the ReplicaStateEvent and encounters a fatal error 
> before handling the subsequent ControllerEvent, a new controller will 
> eventually be elected and the state of all replicas will become known to the 
> new controller.

I think we should mention that the controller will keep it's current
implementation of marking the replicas as offline because of failure
in the LeaderAndIsr response.

-- 
-Jose


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-16 Thread Jose Garcia Sancio
Hi Boyang,

Thanks for the KIP. The KIP looks good. I have a few questions and comments.

> As part of the KIP-500

initiative, we need to build a bridge release version of Kafka that could
isolate the direct Zookeeper write access only to the controller.

This may be outside the scope of this KIP but I am trying to understand how
this is going to be used to implement KIP-500. My understanding is that the
Admin client discovers the controller by performing a
Metadata{Request,Response} round trip. The response of this request
includes the id of the controller. Based on my understanding of the
KIP-500, this architecture will need to change. For example the controller
will not necessarily be a broker in which case the id may not correlate to
a broker id. Is the expectation that the Kafka Controller Quorum (as
defined in KIP-500) will push this new connection information to all of the
brokers? Will the Kafka Controller Quorum expose and implement all of the
RPCs being redirected in this KIP and the ones that are currently routed to
the controller? These include:

ListPartitionReassignment
AlterPartitionReassignment
ElectLeaders
CreatePartitions
DeleteTopics
CreateTopics

> AUTHORIZATION_FAILED if the inter-broker verification failed.

The rest of the document mentions CLUSTER_AUTHORIZATION_FAILED.

> For CLUSTER_AUTHORIZATION_FAILED, this indicates an internal error for
broker security setup which has nothing to do with the client, so we have
no other way but returning an UNKNOWN_SERVER_ERROR to the admin client.

I don't understand this. I think I don't understand this because it is not
clear to me who, how and when authorization is going to work when using
Envelopre{Request,Response}. Can you please add a section that explains how
authorization works when envelopes are involved?

-- 
-Jose


Re: [DISCUSS] KIP-158 UPDATED: Enable source connectors to create new topics with specific configs in Kafka Connect during runtime

2020-02-03 Thread Jose Garcia Sancio
Thanks Konstantine. Looking forward to this feature.

The KIP mentions:

> For the *default* group this configuration is required. For any other
group defined in topic.creation.groups this config is optional and if it's
missing it gets the value the *default* group

For the properties "topic.creation.$alias.replication.factor" and
"topic.creation.$alias.partitions". I think that we can and should make
this optional for all groups including the "default" group. Kafka's
CreateTopicRequest message allows these two fields to be optional. Here are
their descriptions respectively:

> The number of replicas to create for each partition in the topic, or -1
if we are either specifying a manual partition assignment or using the
default repli
cation factor.
> The number of partitions to create in the topic, or -1 if we are either
specifying a manual partition assignment or using the default partitions.

At the Java Client level this is model using Java's Optional type. I think
that we can make them both optional and resolve them to "Optional.empty()"
if neither the specific group or "default" is set.

Thanks,
Jose


On Thu, Dec 19, 2019 at 8:27 PM Tom Bentley  wrote:

> Thanks Konstantine, lgtm.
>
> On Thu, Dec 19, 2019 at 5:34 PM Ryanne Dolan 
> wrote:
>
> > Thanks for the reply Konstantine. Makes sense.
> >
> > Ryanne
> >
> > On Tue, Dec 17, 2019, 6:41 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Thanks Randall and Ryanne for your comments.
> > >
> > > I'm replying to them below, in order of appearance:
> > >
> > > To Randall's comments:
> > > 1) I assumed these properties would be visible to connectors, since by
> > > definition these are connector properties. I added a mention. However
> I'm
> > > not sure if you are also making a specific suggestion with this
> > question. I
> > > didn't find a similar mention in KIP-458, but 'override' directives
> also
> > > appear in both the connector and the task properties. Given this
> > precedent,
> > > I think it makes sense to forward these properties to the connector as
> > > well.
> > >
> > > 2) Doesn't hurt to add a note in the KIP. Added in the table. This
> > > definitely belongs to the Kafka Connect docs that will describe how to
> > > operate Connect with this feature enabled.
> > >
> > > 3) Added a note to mention that a task might fail during runtime and
> that
> > > early validation won't be in place for this feature.
> > >
> > > 4) Examples added and the sentence regarding ACLs and failure was
> > adjusted
> > > to reflect the new proposal.
> > >
> > > 5) Also addressed and the KIP now mentions that the task will fail if
> the
> > > feature is enabled and the broker does not support the Admin API.
> > >
> > > To your point Ryanne, I'm also often in favor of reserving some room
> for
> > > customizations that will be able to address specific user needs, but I
> > > don't think we have a strong case for making this functionality
> pluggable
> > > at the moment. Topics are not very transient entities in Kafka. And
> this
> > > feature is focusing specifically on topic creation and does not suggest
> > > altering configuration of existing topics, including topics that may be
> > > created once by a connector that will use this new functionality.
> > > Therefore, adapting to changes to the attainable replication factor
> > during
> > > runtime, without expressing this in the configuration of a connector
> > seems
> > > to involve more risks than benefits. Overall, a generic topic creation
> > hook
> > > shares similarities to exposing an admin client to the connector itself
> > and
> > > based on previous discussions, seems that this approach will result in
> > > considerable extensions in both configuration and implementation
> without
> > it
> > > being fully justified at the moment.
> > >
> > > I suggest moving forward without pluggable classes for now, and if in
> the
> > > future we wish to return to this topic for second iteration, then
> > factoring
> > > out the proposed functionality under the configuration of a module that
> > > applies topic creation based on regular expressions should be easy to
> do
> > in
> > > a compatible way.
> > >
> > > Best,
> > > Konstantine
> > >
> > >
> > > On Thu, Dec 12, 2019 at 1:37 PM Ryanne Dolan 
> > > wrote:
> > >
> > > > Konstantine, thanks for the updates. I wonder if we should take your
> > > > proposal one step further and make this pluggable. Your
> include/exclude
> > > > regexes are great out-of-the-box features, but it may be valuable to
> > > > plug-in more sophisticated logic to handle topic creation.
> > > >
> > > > Instead of enabling/disabling the feature as a whole, the default
> > > > TopicCreator (or whatever) could be a nop. Then we include a
> > > > RegexTopicCreator with your proposed behavior. This would be almost
> > > > indistinguishable from your current KIP from a user's perspective,
> but
> > > > would enable plug-in TopicCreators that do some of the things you
> 

Re: [DISCUSS] KIP-555: Deprecate Direct Zookeeper access in Kafka Administrative Tools

2020-01-10 Thread Jose Garcia Sancio
Comments below.

On Thu, Jan 9, 2020 at 5:03 PM Colin McCabe  wrote:

> That's a good question.  The current plan is for the 3.x releases to still
> require ZooKeeper.  What we will drop in 3.x is direct ZK access in
> command-line administrative tools (unless those tools are specifically
> about administering ZK itself, like launching it, stopping it, editing its
> internal settings, etc.)
>
>
Makes sense to me. Thanks for the clarification.

-- 
-Jose


Re: [VOTE] KIP-551: Expose disk read and write metrics

2020-01-10 Thread Jose Garcia Sancio
+1, LGTM.

On Fri, Jan 10, 2020 at 2:19 PM Gwen Shapira  wrote:

> +1, thanks for driving this
>
> On Fri, Jan 10, 2020 at 2:17 PM Colin McCabe  wrote:
> >
> > Hi all,
> >
> > I'd like to start the vote on KIP-551: Expose disk read and write
> metrics.
> >
> > KIP:  https://cwiki.apache.org/confluence/x/sotSC
> >
> > Discussion thread:
> >
> https://lists.apache.org/thread.html/cfaac4426455406abe890464a7f4ae23a5c69a39afde66fe6eb3d696%40%3Cdev.kafka.apache.org%3E
> >
> > cheers,
> > Colin
>


-- 
-Jose


Re: [DISCUSS] KIP-555: Deprecate Direct Zookeeper access in Kafka Administrative Tools

2020-01-09 Thread Jose Garcia Sancio
Thanks Colin,

For the tools that only support zookeeper (zookeeper-security-migration.sh
and zookeeper-shell.sh) should we be deprecating the entire tool for
removal in a future 3.0 release?

On Thu, Jan 9, 2020 at 4:24 PM Colin McCabe  wrote:

> Hi all,
>
> I wrote KIP about deprecating the --zookeeper flag in the administrative
> tools.  Take a look if you get a chance:
>
> https://cwiki.apache.org/confluence/x/sotSC
>
> best,
> Colin
>


-- 
-Jose


Re: [DISCUSS] KIP-551: Expose disk read and write metrics

2020-01-09 Thread Jose Garcia Sancio
Thanks Colin,

LGTM in general. The Linux documentation (
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/Documentation/filesystems/proc.txt?id=HEAD#n1644)
defines these metrics as

read_bytes
> --
>
> I/O counter: bytes read
> Attempt to count the number of bytes which this process really did cause to
> be fetched from the storage layer. Done at the submit_bio() level, so it is
> accurate for block-backed filesystems.  CIFS at a later time>
>
>
> write_bytes
> ---
>
> I/O counter: bytes written
> Attempt to count the number of bytes which this process caused to be sent
> to
> the storage layer. This is done at page-dirtying time.
>

It looks like there is also another metric (cancelled_write_bytes) that
affects the value reported in written_bytes. Do we want to take that into
account when reporting the JMX metric
kafka.server:type=KafkaServer,name=DiskWriteBytes?

cancelled_write_bytes
> -
>
> The big inaccuracy here is truncate. If a process writes 1MB to a file and
> then deletes the file, it will in fact perform no writeout. But it will
> have
> been accounted as having caused 1MB of write.
> In other words: The number of bytes which this process caused to not
> happen,
> by truncating pagecache. A task can cause "negative" IO too. If this task
> truncates some dirty pagecache, some IO which another task has been
> accounted
> for (in its write_bytes) will not be happening. We _could_ just subtract
> that
> from the truncating task's write_bytes, but there is information loss in
> doing
> that.



On Mon, Jan 6, 2020 at 5:28 PM Colin McCabe  wrote:

> On Tue, Dec 10, 2019, at 11:10, Magnus Edenhill wrote:
> > Hi Colin,
> >
>
> Hi Magnus,
>
> Thanks for taking a look.
>
> > aren't those counters (ever increasing), rather than gauges
> (fluctuating)?
>
> Since this is in the Kafka broker, we're using Yammer.  This might be
> confusing, but Yammer's concept of a "counter" is not actually monotonic.
> It can decrease as well as increase.
>
> In general Yammer counters require you to call inc(amount) or dec(amount)
> on them.  This doesn't match up with what we need to do here, which is to
> (essentially) make a callback into the kernel by reading from /proc.
>
> The counter/gauge dichotomy doesn't affect the JMX, (I think?), so it's
> really kind of an implementation detail.
>
> >
> > You also mention CPU usage as a side note, you could use getrusage(2)'s
> > ru_utime (user) and ru_stime (sys)
> > to allow the broker to monitor its own CPU usage.
> >
>
> Interesting idea.  It might be better to save that for a future KIP,
> though, to avoid scope creep.
>
> best,
> Colin
>
> > /Magnus
> >
> > Den tis 10 dec. 2019 kl 19:33 skrev Colin McCabe :
> >
> > > Hi all,
> > >
> > > I wrote KIP about adding support for exposing disk read and write
> > > metrics.  Check it out here:
> > >
> > > https://cwiki.apache.org/confluence/x/sotSC
> > >
> > > best,
> > > Colin
> > >
> >
>


-- 
-Jose