Re: maxParallelForks while running tests

2022-04-15 Thread Unmesh Joshi
I am not running into any specific issue with kafka code. Was just curious
to know if there were any specific fixes done to handle this issue.

On Fri, Apr 15, 2022 at 8:24 AM Luke Chen  wrote:

> Hi Unmesh,
>
> Are you running into any issue with that?
>
> So far, the `maxParallelForks` can be set via gradle argument:
> https://github.com/apache/kafka/blob/trunk/build.gradle#L78
>
> And in Jenkins, it looks like we default to 2.
> https://github.com/apache/kafka/blob/trunk/Jenkinsfile#L40
>
> Thank you.
> Luke
>
> On Fri, Apr 15, 2022 at 1:24 AM Unmesh Joshi 
> wrote:
>
> > Hi,
> > I came across this issue which has discussion about capping
> > maxParallelForks while running tests.
> > https://issues.apache.org/jira/browse/KAFKA-2613
> > Is this still the case?
> >
> > Thanks,
> > Unmesh
> >
>


maxParallelForks while running tests

2022-04-14 Thread Unmesh Joshi
Hi,
I came across this issue which has discussion about capping
maxParallelForks while running tests.
https://issues.apache.org/jira/browse/KAFKA-2613
Is this still the case?

Thanks,
Unmesh


Re: [VOTE] voting on KIP-631: the quorum-based Kafka controller

2020-12-16 Thread Unmesh Joshi
Went through the changes since the last discussion thread, and it's looking
in good shape. Thanks!.
+ 1 (non-binding)

On Wed, Dec 16, 2020 at 4:34 PM Tom Bentley  wrote:

> Thanks for the KIP Colin, it does a great job of clearly explaining some
> pretty complex changes.
>
> +1 (non-binding)
>
> Tom
>
>
>
> On Tue, Dec 15, 2020 at 7:13 PM Boyang Chen 
> wrote:
>
> > Thanks Colin for the great work to polish the KIP and reach this final
> > stage. +1 (binding) from me
> >
> > On Tue, Dec 15, 2020 at 9:11 AM David Arthur  wrote:
> >
> > > Colin, thanks for driving this. I just read through the KIP again and I
> > > think it is in good shape. Exciting stuff!
> > >
> > > +1 binding
> > >
> > > -David
> > >
> > > On Sat, Dec 12, 2020 at 7:46 AM Ron Dagostino 
> wrote:
> > >
> > > > Thanks for shepherding this KIP through the extended discussion,
> Colin.
> > > I
> > > > think we’ve ended up in a good place.  I’m sure there will be more
> > tweaks
> > > > along the way, but the fundamentals are in place.  +1 (non-binding)
> > from
> > > me.
> > > >
> > > > Ron
> > > >
> > > > > On Dec 11, 2020, at 4:39 PM, Colin McCabe 
> > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I'd like to restart the vote on KIP-631: the quorum-based Kafka
> > > > Controller.  The KIP is here:
> > > > >
> > > > > https://cwiki.apache.org/confluence/x/4RV4CQ
> > > > >
> > > > > The original DISCUSS thread is here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r1ed098a88c489780016d963b065e8cb450a9080a4736457cd25f323c%40%3Cdev.kafka.apache.org%3E
> > > > >
> > > > > There is also a second email DISCUSS thread, which is here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r1ed098a88c489780016d963b065e8cb450a9080a4736457cd25f323c%40%3Cdev.kafka.apache.org%3E
> > > > >
> > > > > Please take a look and vote if you can.
> > > > >
> > > > > best,
> > > > > Colin
> > > >
> > >
> > >
> > > --
> > > David Arthur
> > >
> >
>


Re: [DISCUSSION] KIP-650: Enhance Kafkaesque Raft semantics

2020-12-06 Thread Unmesh Joshi
Hi Boyang,

Thanks for the KIP..
I think there are two aspects of linearizable read implementations in Raft.

1. Providing linearizable reads from the leader
   Even read requests from the leader might not return the latest committed
results if the leader is partitioned. The leader needs to make sure that it
is not partitioned and superseded by a new leader. So it needs to send
heartbeats and wait till it gets the response from the quorum of followers,
before returning a response to read requests.
(I think this is the issue found in etcd and consul with jepsen tests,
https://aphyr.com/posts/316-call-me-maybe-etcd-and-consul)
etcd, has implemented readIndex mechanism for this
https://github.com/etcd-io/etcd/pull/6212/commits/e3e39930229830b2991ec917ec5d2ba625febd3f
)

2. Providing safe and strictly ordered reads from followers.
When reading from followers, it's important to make sure that external
clients reading from two different followers see results in strict order.
In zookeeper this is provided by sync requests. As described in the
discussion below in zookeeper, sync won't provide linearizable reads, as
it's always possible that between a sync and return of a read, a new entry
is committed on the leader.
https://mail-archives.apache.org/mod_mbox/zookeeper-user/201303.mbox/%3CCAJwFCa0Hoekc14Zy6i0LyLj=eraf8jimqmzadohokqjntmt...@mail.gmail.com%3E
The discussion in Raft thesis about reads from followers is mainly to make
sure clients are not affected by followers which are partitioned, and get
latest updates.
This is an important concern, and I think the KIP talks mostly about this.
It will probably be useful to have a Kafka specific scenario rather than an
Alice and Bob scenario to describe the situation.
e.g. A client or observers reading from a follower, might never receive
TopicRecords and PartitionRecords when a topic is created on controller
quorum and the particular follower is partitioned. Or a producer/consumer
will never know about a broker which is fenced.

It will be also good to mention which type of clients will be affected by
this scenario. Brokers in the Kafka cluster always need to talk to the
leader of the controller quorum for reading metadata. Because their
heartbeats and leases are tracked by the leader of the controller quorum,
they can not talk to followers.
It's mostly the producers and consumers, which need to read metadata from
the controller quorum which might be affected by a partitioned follower.

Thanks,
Unmesh





On Sat, Dec 5, 2020 at 3:25 AM Guozhang Wang  wrote:

> Hi Boyang,
>
> Thanks for raising this up. I have a few thoughts about the "Non-leader
> Linearizable Read", and I think there are two goals we can consider
> achieving here from a user's perspective:
>
> 1) that each of your queries on the raft log is always going to return the
> latest committed result.
>
> 2) that your consecutive queries would not "go back".
>
> And as we can see 1) is a stronger guarantee than 2), meaning that
> achieving 1) would always guarantee 2) as well.
>
> In your football example, in order for Alice and Bob to agree on each
> other, we would have to achieve 1) above; but practically it may be okay
> for Alice and Bob to see different results temporarily. However, for a
> single user like Alice, it is usually required that if she issued the query
> twice, and the first returns "final result 1:0", the second should not
> return "no final result yet". And to achieve this only 2) is needed. And it
> is easier to achieve 2) without the proposed new request/resp, for example,
> we can let the client associate its query with an offset which it got from
> the previous query result, and require whoever is answering that query to
> have applied the state machine at least up to that offset.
>
> If we consider in Kafka's case, I feel that in most scenarios just
> achieving 2) is good enough, e.g. for querying metadata. Today many of the
> error cases actually come from the fact that if you query two different
> brokers, your results may actually "go back in time". So I'd like to play
> devil's advocate here and ask us if achieving a stronger semantics is
> indeed needed in Kafka.
>
> Guozhang
>
>
>
> On Wed, Dec 2, 2020 at 10:17 AM Boyang Chen 
> wrote:
>
> > Hey there,
> >
> > I would like to start a discussion thread for a KIP to improve on our
> > existing Kafka Raft semantics, specifically adding pre-vote and
> > linearizable read:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics
> >
> > Let me know what you think, thank you!
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-631: The Quorum-based Kafka Controller

2020-09-17 Thread Unmesh Joshi
Thanks for the KIP.

+1 (non-binding)

On Tue, Sep 15, 2020 at 12:23 AM Colin McCabe  wrote:

> Hi all,
>
> I'd like to call a vote on KIP-631: the quorum-based Kafka Controller.
> The KIP is here:
>
> https://cwiki.apache.org/confluence/x/4RV4CQ
>
> The DISCUSS thread is here:
>
>
> https://lists.apache.org/thread.html/r1ed098a88c489780016d963b065e8cb450a9080a4736457cd25f323c%40%3Cdev.kafka.apache.org%3E
>
> Please take a look and vote if you can.
>
> best,
> Colin
>


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-16 Thread Unmesh Joshi
Thanks Colin, the changes look good to me. One small thing.
registration.lease.timeout.ms is the configuration on the controller side.
It will be good to comment on how brokers know about it, to be able to
send LeaseDurationMs
in the heartbeat request,
or else it can be added in the heartbeat response for brokers to know about
it.

Thanks,
Unmesh

On Fri, Sep 11, 2020 at 10:32 PM Colin McCabe  wrote:

> Hi Unmesh,
>
> I think you're right that we should use a duration here rather than a
> time.  As you said, the clock on the controller will probably not match the
> one on the broker.  I have updated the KIP.
>
> > > It's important to keep in mind that messages may be delayed in the
> > > network, or arrive out of order.  When this happens, we will use the
> start
> > > time specified in the request to determine if the request is stale.
> > I am assuming that there will be a single TCP connection maintained
> between
> > broker and active controller. So, there won't be any out of order
> requests?
> > There will be a scenario of broker GC pause, which might cause connection
> > timeout and broker might need to reestablish the connection. If the pause
> > is too long, lease will expire and the heartbeat sent after the pause
> will
> > be treated as a new registration (similar to restart case), and a new
> epoch
> > number will be assigned to the broker.
>
> I agree with the end of this paragraph, but not with the start :)
>
> There can be out-of-order requests, since the broker will simply use a new
> TCP connection if the old one has problems.  This can happen for a variety
> of reasons.  I don't think GC pauses are the most common reason for this to
> happen.  It's more common to see issues issues in the network itself that
> result connections getting dropped from time to time.
>
> So we have to assume that messages may arrive out of order, and possibly
> be delayed.  I added a note that heartbeat requests should be ignored if
> the metadata log offset they contain is smaller than a previous heartbeat.
>
> > When the active controller fails, the new active controller needs to be
> > sure that it considers all the known brokers as alive till the lease
> > expiration interval.  Because registration.lease.timeout.ms, is
> configured
> > on the controller, the new active controller will extend all the leases
> by
> > registration.lease.timeout.ms. I see that it won't need last heartbeat
> > time.
>
> Agreed.
>
> best,
> Colin
>
> >
> > Thanks,
> > Unmesh
> >
> > On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe  wrote:
> >
> > > > Colin wrote:
> > > > > The reason for including LeaseStartTimeMs in the request is to
> ensure
> > > > > that the time required to communicate with the controller gets
> > > included in
> > > > > the lease time.  Since requests can potentially be delayed in the
> > > network
> > > > > for a long time, this is important.
> > >
> > > On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> > > > The network time will be added anyway, because the lease timer on the
> > > > active controller will start only after the heartbeat request
> reaches the
> > > > server.
> > >
> > > Hi Unmesh,
> > >
> > > If the start time is not specified in the request, then the network
> time
> > > is excluded from the heartbeat time.
> > >
> > > Here's an example:
> > > Let's say broker A sends a heartbeat at time 100, and it arrives on the
> > > controller at time 200, and the lease duration is 1000.
> > >
> > > The controller looks at the start time in the request, which is 100,
> and
> > > adds 1000 to it, getting 1100.  On the other hand, if start time is not
> > > specified in the request, then the expiration will be at time 1200.
> > > That is what I mean by "the network time is included in the expiration
> > > time."
> > >
> > > > And I think, some assumption about network round trip time is
> > > > needed anyway to decide on the frequency of the heartbeat (
> > > > registration.heartbeat.interval.ms), and lease timeout (
> > > > registration.lease.timeout.ms). So I think just having a leaseTTL
> in the
> > > > request is easier to understand and implement.
> > >
> > > It's important to keep in mind that messages may be delayed in the
> > > network, or arrive out of order.  When this happens, we will use the
> start
> > > time specified in the request to determine if th

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-04 Thread Unmesh Joshi
Hi Colin,
Thanks for the response.
>>If the start time is not specified in the request, then the network time
is excluded from the heartbeat time.
The most common implementation pattern I see (looking at Google Chubby
sessions, Zookeeper sessions, etcd lease implementation, LogCabin session
and Docker swarm session) is that only durations are sent in requests and
responses, and not the actual time values. The actual time values for the
lease expiration as well as to send the next heartbeat are derived. e.g.
Zookeeper calculates the heartbeat time on the client as (2/3 / 2 ~ 1/3) of
the session expiration interval that it gets in the session response.
This makes sure that even if actual time values on different servers have
some differences, it won't impact the lease expiration mechanism.

>>>It's important to keep in mind that messages may be delayed in the
network, or arrive out of order.  When this happens, we will use the start
>>time specified in the request to determine if the request is stale.
I am assuming that there will be a single TCP connection maintained between
broker and active controller. So, there won't be any out of order requests?
There will be a scenario of broker GC pause, which might cause connection
timeout and broker might need to reestablish the connection. If the pause
is too long, lease will expire and the heartbeat sent after the pause will
be treated as a new registration (similar to restart case), and a new epoch
number will be assigned to the broker.

>>>so the need to re-establish them after a controller failover doesn't
seem like a big problem
When the active controller fails, the new active controller needs to be
sure that it considers all the known brokers as alive till the lease
expiration interval.  Because registration.lease.timeout.ms, is configured
on the controller, the new active controller will extend all the leases by
registration.lease.timeout.ms. I see that it won't need last heartbeat
time.

Thanks,
Unmesh

On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe  wrote:

> > Colin wrote:
> > > The reason for including LeaseStartTimeMs in the request is to ensure
> > > that the time required to communicate with the controller gets
> included in
> > > the lease time.  Since requests can potentially be delayed in the
> network
> > > for a long time, this is important.
>
> On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> > The network time will be added anyway, because the lease timer on the
> > active controller will start only after the heartbeat request reaches the
> > server.
>
> Hi Unmesh,
>
> If the start time is not specified in the request, then the network time
> is excluded from the heartbeat time.
>
> Here's an example:
> Let's say broker A sends a heartbeat at time 100, and it arrives on the
> controller at time 200, and the lease duration is 1000.
>
> The controller looks at the start time in the request, which is 100, and
> adds 1000 to it, getting 1100.  On the other hand, if start time is not
> specified in the request, then the expiration will be at time 1200.
> That is what I mean by "the network time is included in the expiration
> time."
>
> > And I think, some assumption about network round trip time is
> > needed anyway to decide on the frequency of the heartbeat (
> > registration.heartbeat.interval.ms), and lease timeout (
> > registration.lease.timeout.ms). So I think just having a leaseTTL in the
> > request is easier to understand and implement.
>
> It's important to keep in mind that messages may be delayed in the
> network, or arrive out of order.  When this happens, we will use the start
> time specified in the request to determine if the request is stale.
>
> > > Yes, I agree that the lease timeout on the controller side should be
> > > reset in the case of controller failover.  The alternative would be to
> > > track the lease as hard state rather than soft state, but I think that
> > > is not really needed, and would result in more log entries.
> > My interpretation of the mention of BrokerRecord in the KIP was that this
> > record exists in the Raft log.
>
> BrokerRecord does exist in the Raft log, but does not include the last
> heartbeat time.
>
> > By soft state, do you mean the broker
> > records exist only on the active leader and will not be replicated in the
> > raft log? If the live brokers list is maintained only on the active
> > controller (raft leader), then, in case of leader failure, there will be
> a
> > window where the new leader does not know about the live brokers, till
> the
> > brokers establish the leases again.
> > I think it will be safer to have leases as a hard state managed by
> standard
> > Ra

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-01 Thread Unmesh Joshi
>>Yes, I agree that the lease timeout on the controller side should be
reset.The alternative would be to track the lease as hard state rather than
soft state, but I think that is not really >> >> needed, and would result
in more log entries.
On the related note, I think it will be good to add some details on how
leases are maintained in case of active controller (raft leader) failure.
e.g. Attached a few diagrams considering leases are maintained with log
entries.

On Mon, Aug 31, 2020 at 6:28 PM Unmesh Joshi  wrote:

> >>The reason for including LeaseStartTimeMs in the request is to ensure
> that the time required to communicate with the controller gets included in
> >>the lease time.  Since requests can potentially be delayed in the network
> for a long time, this is important.
> The network time will be added anyway, because the lease timer on the
> active controller will start only after the heartbeat request reaches the
> server. And I think, some assumption about network round trip time is
> needed anyway to decide on the frequency of the heartbeat (
> registration.heartbeat.interval.ms), and lease timeout (
> registration.lease.timeout.ms). So I think just having a leaseTTL in the
> request is easier to understand and implement.
> >>>Yes, I agree that the lease timeout on the controller side should be
> reset in the case of controller failover.  The alternative would be to
> track the >>>lease as hard state rather than soft state, but I think that
> is not really needed, and would result in more log entries.
> My interpretation of the mention of BrokerRecord in the KIP was that this
> record exists in the Raft log. By soft state, do you mean the broker
> records exist only on the active leader and will not be replicated in the
> raft log? If the live brokers list is maintained only on the active
> controller (raft leader), then, in case of leader failure, there will be a
> window where the new leader does not know about the live brokers, till the
> brokers establish the leases again.
> I think it will be safer to have leases as a hard state managed by
> standard Raft replication.
> Or am I misunderstanding something? (I assume that with soft state, you
> mean something like zookeeper local sessions
> https://issues.apache.org/jira/browse/ZOOKEEPER-1147.)
>
> >>Our code is single threaded as well.  I think it makes sense for the
> controller, since otherwise locking becomes very messy.  I'm not sure I
> >>understand your question about duplicate broker ID detection, though.
> There's a section in the KIP about this -- is there a detail we should add
> ?>>there?
> I assumed broker leases are implemented as a hard state. In that case, to
> check for broker id conflict, we need to check the broker ids at two places
> 1. Pending broker registrations (which are yet to be committed) 2. Already
> committed broker registrations.
>
> Thanks,
> Unmesh
>
>
>
> On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe  wrote:
>
>> On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
>> > >>>Can you repeat your questions about broker leases?
>> >
>> > >>>>The LeaseStartTimeMs is expected to be the broker's
>> > 'System.currentTimeMillis()' at the point of the request. The active
>> > controller will add its lease period to this in order >>>>to compute the
>> > LeaseEndTimeMs.
>> >
>> > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a
>> > bit
>> > confusing.  Monotonic Clock (System.nanoTime) on the active controller
>> > should be used to track leases.
>> > (For example,
>> >
>> https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
>> > )
>> >
>> > Then we will not need LeaseStartTimeMs?
>> > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
>> controller
>> > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
>> > In this case we might just drop LeaseEndTimeMs from the response, as the
>> > broker already knows about the TTL and can send heartbeats at some
>> fraction
>> > of TTL, say every TTL/4 milliseconds.(elapsed time on the broker
>> measured
>> > by System.nanoTime)
>> >
>>
>> Hi Unmesh,
>>
>> I agree that the monotonic clock is probably a better idea here.  It is
>> good to be robust against wall clock changes, although I think a cluster
>> which had them might suffer other issues.  I will change it to specify a
>> monotonic clock.
>>
>> The reason

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-31 Thread Unmesh Joshi
>>The reason for including LeaseStartTimeMs in the request is to ensure
that the time required to communicate with the controller gets included in
>>the lease time.  Since requests can potentially be delayed in the network
for a long time, this is important.
The network time will be added anyway, because the lease timer on the
active controller will start only after the heartbeat request reaches the
server. And I think, some assumption about network round trip time is
needed anyway to decide on the frequency of the heartbeat (
registration.heartbeat.interval.ms), and lease timeout (
registration.lease.timeout.ms). So I think just having a leaseTTL in the
request is easier to understand and implement.
>>>Yes, I agree that the lease timeout on the controller side should be
reset in the case of controller failover.  The alternative would be to
track the >>>lease as hard state rather than soft state, but I think that
is not really needed, and would result in more log entries.
My interpretation of the mention of BrokerRecord in the KIP was that this
record exists in the Raft log. By soft state, do you mean the broker
records exist only on the active leader and will not be replicated in the
raft log? If the live brokers list is maintained only on the active
controller (raft leader), then, in case of leader failure, there will be a
window where the new leader does not know about the live brokers, till the
brokers establish the leases again.
I think it will be safer to have leases as a hard state managed by standard
Raft replication.
Or am I misunderstanding something? (I assume that with soft state, you
mean something like zookeeper local sessions
https://issues.apache.org/jira/browse/ZOOKEEPER-1147.)

>>Our code is single threaded as well.  I think it makes sense for the
controller, since otherwise locking becomes very messy.  I'm not sure I
>>understand your question about duplicate broker ID detection, though.
There's a section in the KIP about this -- is there a detail we should add
?>>there?
I assumed broker leases are implemented as a hard state. In that case, to
check for broker id conflict, we need to check the broker ids at two places
1. Pending broker registrations (which are yet to be committed) 2. Already
committed broker registrations.

Thanks,
Unmesh



On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe  wrote:

> On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
> > >>>Can you repeat your questions about broker leases?
> >
> > >>>>The LeaseStartTimeMs is expected to be the broker's
> > 'System.currentTimeMillis()' at the point of the request. The active
> > controller will add its lease period to this in order >>>>to compute the
> > LeaseEndTimeMs.
> >
> > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a
> > bit
> > confusing.  Monotonic Clock (System.nanoTime) on the active controller
> > should be used to track leases.
> > (For example,
> >
> https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
> > )
> >
> > Then we will not need LeaseStartTimeMs?
> > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
> controller
> > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
> > In this case we might just drop LeaseEndTimeMs from the response, as the
> > broker already knows about the TTL and can send heartbeats at some
> fraction
> > of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
> > by System.nanoTime)
> >
>
> Hi Unmesh,
>
> I agree that the monotonic clock is probably a better idea here.  It is
> good to be robust against wall clock changes, although I think a cluster
> which had them might suffer other issues.  I will change it to specify a
> monotonic clock.
>
> The reason for including LeaseStartTimeMs in the request is to ensure that
> the time required to communicate with the controller gets included in the
> lease time.  Since requests can potentially be delayed in the network for a
> long time, this is important.
>
> >
> > I have a prototype built to demonstrate this as following:
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
> >
> > The Kip631Controller itself depends on a Consensus module, to demonstrate
> > how possible interactions with the consensus module will look like
> >  (The Consensus can be pluggable really, with an API to allow reading
> > replicated log upto HighWaterMark)
> >
> > It has an implementation of LeaseTracker
> >
> https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/s

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-29 Thread Unmesh Joshi
>>>Can you repeat your questions about broker leases?

>>>>The LeaseStartTimeMs is expected to be the broker's
'System.currentTimeMillis()' at the point of the request. The active
controller will add its lease period to this in order >>>>to compute the
LeaseEndTimeMs.

I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a bit
confusing.  Monotonic Clock (System.nanoTime) on the active controller
should be used to track leases.
(For example,
https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
)

Then we will not need LeaseStartTimeMs?
Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active controller
can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
In this case we might just drop LeaseEndTimeMs from the response, as the
broker already knows about the TTL and can send heartbeats at some fraction
of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
by System.nanoTime)

I have a prototype built to demonstrate this as following:
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala

The Kip631Controller itself depends on a Consensus module, to demonstrate
how possible interactions with the consensus module will look like
 (The Consensus can be pluggable really, with an API to allow reading
replicated log upto HighWaterMark)

It has an implementation of LeaseTracker
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
to demonstrate LeaseTracker's interaction with the consensus module.

The implementation has the following aspects:
1. The lease tracking happens only on the active controller (raft leader)
2. Once the lease expires, it needs to propose and commit a FenceBroker
record for that lease.
3. In case of active controller failure, the lease will be tracked by the
newly raft leader. The new raft leader starts the lease timer again, (as
implemented in onBecomingLeader method of
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
)
in effect extending the lease by the time spent in the leader election and
whatever time was elapsed on the old leader.

There are working tests for this implementation here.
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala
and an end to end test here
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
<https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala>
>>'m not sure what you mean by "de-duplication of the broker."  Can you
give a little more context?
Apologies for using the confusing term deduplication. I meant broker id
conflict.
As you can see in the prototype handleRequest of KIP631Controller
<https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala>,
the duplicate broker id needs to be detected before the BrokerRecord is
submitted to the raft module.
Also as implemented in the prototype, the KIP631Controller is single
threaded, handling requests one at a time. (an example of
https://martinfowler.com/articles/patterns-of-distributed-systems/singular-update-queue.html
)

Thanks,
Unmesh

On Sat, Aug 29, 2020 at 10:49 AM Colin McCabe  wrote:

> On Fri, Aug 28, 2020, at 19:36, Unmesh Joshi wrote:
> > Hi Colin,
> >
> > There were a few of questions I had..
>
> Hi Unmesh,
>
> Thanks for the response.
>
> >
> > 1. Were my comments on the broker lease implementation (and corresponding
> > prototype) appropriate and do we need to change the KIP
> > description accordingly?.
> >
>
> Can you repeat your questions about broker leases?
>
> >
> > 2. How will broker epochs be generated? I am assuming it can be the
> > committed log offset (like zxid?)
> >
>
> There isn't any need to use a log offset.  We can just look at an
> in-memory hash table and see what the latest number is, and add one, to
> generate a new broker epoch.
>
> >
> > 3. How will producer registration happen? I am assuming it should be
> > similar to broker registration, with a similar way to generate producer
> id.
> >
>
> For the EOS stuff, we will need a few new RPCs to the controller.  I think
> we should do that in a follow-on KIP, though, since this one is already
> pretty big.
>
> >
> > 4. Because we expose Raft log to all the brokers, any de-duplication of
> the
> > broker needs to happen before the requests are proposed 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-28 Thread Unmesh Joshi
Hi Colin,

There were a few of questions I had..
1. Were my comments on the broker lease implementation (and corresponding
prototype) appropriate and do we need to change the KIP
description accordingly?.
2. How will broker epochs be generated? I am assuming it can be the
committed log offset (like zxid?)
3. How will producer registration happen? I am assuming it should be
similar to broker registration, with a similar way to generate producer id.
4. Because we expose Raft log to all the brokers, any de-duplication of the
broker needs to happen before the requests are proposed to Raft. For this
the controller needs to be single threaded, and should do validation
against the in-process or pending requests and the final state. I read a
mention of this, in the responses in this thread.Will it be useful to
mention this in the KIP?

Thanks,
Unmesh

On Sat, Aug 29, 2020 at 4:50 AM Colin McCabe  wrote:

> Hi all,
>
> I'm thinking of calling a vote on KIP-631 on Monday.  Let me know if
> there's any more comments I should address before I start the vote.
>
> cheers,
> Colin
>
> On Tue, Aug 11, 2020, at 05:39, Unmesh Joshi wrote:
> > >>Hi Unmesh,
> > >>Thanks, I'll take a look.
> > Thanks. I will be adding more to the prototype and will be happy to help
> > and collaborate.
> >
> > Thanks,
> > Unmesh
> >
> > On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe 
> wrote:
> >
> > > Hi Jose,
> > >
> > > That'a s good point that I hadn't considered.  It's probably worth
> having
> > > a separate leader change message, as you mentioned.
> > >
> > > Hi Unmesh,
> > >
> > > Thanks, I'll take a look.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
> > > > Hi Unmesh,
> > > >
> > > > Very cool prototype!
> > > >
> > > > Hi Colin,
> > > >
> > > > The KIP proposes a record called IsrChange which includes the
> > > > partition, topic, isr, leader and leader epoch. During normal
> > > > operation ISR changes do not result in leader changes. Similarly,
> > > > leader changes do not necessarily involve ISR changes. The controller
> > > > implementation that uses ZK modeled them together because
> > > > 1. All of this information is stored in one znode.
> > > > 2. ZK's optimistic lock requires that you specify the new value
> > > completely
> > > > 3. The change to that znode was being performed by both the
> controller
> > > > and the leader.
> > > >
> > > > None of these reasons are true in KIP-500. Have we considered having
> > > > two different records? For example
> > > >
> > > > 1. IsrChange record which includes topic, partition, isr
> > > > 2. LeaderChange record which includes topic, partition, leader and
> > > leader epoch.
> > > >
> > > > I suspect that making this change will also require changing the
> > > > message AlterIsrRequest introduced in KIP-497: Add inter-broker API
> to
> > > > alter ISR.
> > > >
> > > > Thanks
> > > > -Jose
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-11 Thread Unmesh Joshi
>>Hi Unmesh,
>>Thanks, I'll take a look.
Thanks. I will be adding more to the prototype and will be happy to help
and collaborate.

Thanks,
Unmesh

On Tue, Aug 11, 2020 at 12:28 AM Colin McCabe  wrote:

> Hi Jose,
>
> That'a s good point that I hadn't considered.  It's probably worth having
> a separate leader change message, as you mentioned.
>
> Hi Unmesh,
>
> Thanks, I'll take a look.
>
> best,
> Colin
>
>
> On Fri, Aug 7, 2020, at 11:56, Jose Garcia Sancio wrote:
> > Hi Unmesh,
> >
> > Very cool prototype!
> >
> > Hi Colin,
> >
> > The KIP proposes a record called IsrChange which includes the
> > partition, topic, isr, leader and leader epoch. During normal
> > operation ISR changes do not result in leader changes. Similarly,
> > leader changes do not necessarily involve ISR changes. The controller
> > implementation that uses ZK modeled them together because
> > 1. All of this information is stored in one znode.
> > 2. ZK's optimistic lock requires that you specify the new value
> completely
> > 3. The change to that znode was being performed by both the controller
> > and the leader.
> >
> > None of these reasons are true in KIP-500. Have we considered having
> > two different records? For example
> >
> > 1. IsrChange record which includes topic, partition, isr
> > 2. LeaderChange record which includes topic, partition, leader and
> leader epoch.
> >
> > I suspect that making this change will also require changing the
> > message AlterIsrRequest introduced in KIP-497: Add inter-broker API to
> > alter ISR.
> >
> > Thanks
> > -Jose
> >
>


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-05 Thread Unmesh Joshi
Hi,
I have built a small prototype of how the controller backed by consensus
implementation will look like.
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/Kip631Controller.scala
This is a miniature Kafka like implementation I use to teach distributed
systems at ThoughtWorks, and also experiment with my work on patterns of
distributed systems
<https://martinfowler.com/articles/patterns-of-distributed-systems/>.
It has a working end to end test to demonstrate how the overall system will
possibly look like
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/ProducerConsumerKIP500Test.scala
The Kip631Controller itself depends on a Consensus module, to demonstrate
how possible interactions with the consensus module will look like
 (The Consensus can be pluggable really, with an API to allow reading
replicated log upto HighWaterMark)
As of now I do not have complete state transition implementation of broker
states. But it has implementation of LeaseTracker
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/main/scala/com/dist/simplekafka/kip500/LeaderLeaseTracker.scala
to demonstrate LeaseTracker's interaction with the consensus module. (It
needs to propose and commit a FenceBroker record for expired leases)
I am hoping to add more specific tests for broker state transition here
https://github.com/unmeshjoshi/distrib-broker/blob/master/src/test/scala/com/dist/simplekafka/kip500/Kip631ControllerTest.scala

I hope it will be helpful in driving some discussions and doing quick
experimentation outside of the Kafka codebase.

Thanks,
Unmesh






On Tue, Aug 4, 2020 at 7:52 AM Unmesh Joshi  wrote:

> Hi,
>
> >>>>The LeaseStartTimeMs is expected to be the broker's
> 'System.currentTimeMillis()' at the point of the request. The active
> controller will add its lease period to this in order >>>>to compute
> the LeaseEndTimeMs.
> I think this is a bit confusing.  Monotonic clock on the active controller
> should be used to track leases.
> For example,
> https://issues.apache.org/jira/browse/ZOOKEEPER-1616
>
> https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
>
> Then we will not need LeaseStartTimeMs?
> Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
> controller can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
> In this case we might just drop LeaseEndTimeMs from the response, as the
> broker already knows about the TTL and can send heartbeats at some fraction
> of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
> by System.nanoTime)
>
> Thanks,
> Unmesh
>
>
>
>
>
> On Tue, Aug 4, 2020 at 4:48 AM Colin McCabe  wrote:
>
>> On Mon, Aug 3, 2020, at 15:51, Jose Garcia Sancio wrote:
>> > Thanks for the KIP Colin,
>> >
>> > Here is a partial review:
>> >
>> > > 1. Even when a broker and a controller are co-located in the same
>> JVM, they must
>> > > have different node IDs
>> >
>> > Why? What problem are you trying to solve?
>> >
>>
>> Hi Jose,
>>
>> Thanks for the comments.
>>
>> We did talk about this a bit earlier in the thread.  The controller is
>> always on its own port, even when it is running in the same JVM as a
>> broker.  So it would not make sense to share the same ID here-- your
>> messages would not get through to the controller if you sent them to the
>> broker port instead.  And clearly if the controller is on a separate host
>> from any broker, it can't share a broker id.
>>
>> >
>> > > 2. Node IDs must be set in the configuration file for brokers and
>> controllers.
>> >
>> > I understand that controller IDs must be static and in the
>> > configuration file to be able to generate consensus in KIP-595. Why
>> > are the broker nodes which are observers in KIP-595 cannot discover
>> > their ID on first boot and persist their ID for consistency in future
>> > restarts?
>> >
>>
>> This is discussed in the rejected alternatives section.  Basically, node
>> ID auto-assignment is complicated and antiquated in the age of Kubernetes,
>> Chef, Puppet, Ansible, etc.
>>
>> >
>> > > 3. Controller processes will listen on a separate endpoint from
>> brokers
>> >
>> > Why is this? Kafka supports multi endpoints. For example, one broker
>> > can have one endpoint for listening to connections by other brokers
>> > and another endpoint for connections from admin, producer and consumer
>> > clients.
>> >
>>
>> The reason for having 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-03 Thread Unmesh Joshi
Hi,

The LeaseStartTimeMs is expected to be the broker's
'System.currentTimeMillis()' at the point of the request. The active
controller will add its lease period to this in order to compute
the LeaseEndTimeMs.
I think this is a bit confusing.  Monotonic clock on the active controller
should be used to track leases.
For example,
https://issues.apache.org/jira/browse/ZOOKEEPER-1616
https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114

Then we will not need LeaseStartTimeMs?
Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active controller
can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
In this case we might just drop LeaseEndTimeMs from the response, as the
broker already knows about the TTL and can send heartbeats at some fraction
of TTL, say every TTL/4 milliseconds.(elapsed time on the broker measured
by System.nanoTime)

Thanks,
Unmesh





On Tue, Aug 4, 2020 at 4:48 AM Colin McCabe  wrote:

> On Mon, Aug 3, 2020, at 15:51, Jose Garcia Sancio wrote:
> > Thanks for the KIP Colin,
> >
> > Here is a partial review:
> >
> > > 1. Even when a broker and a controller are co-located in the same JVM,
> they must
> > > have different node IDs
> >
> > Why? What problem are you trying to solve?
> >
>
> Hi Jose,
>
> Thanks for the comments.
>
> We did talk about this a bit earlier in the thread.  The controller is
> always on its own port, even when it is running in the same JVM as a
> broker.  So it would not make sense to share the same ID here-- your
> messages would not get through to the controller if you sent them to the
> broker port instead.  And clearly if the controller is on a separate host
> from any broker, it can't share a broker id.
>
> >
> > > 2. Node IDs must be set in the configuration file for brokers and
> controllers.
> >
> > I understand that controller IDs must be static and in the
> > configuration file to be able to generate consensus in KIP-595. Why
> > are the broker nodes which are observers in KIP-595 cannot discover
> > their ID on first boot and persist their ID for consistency in future
> > restarts?
> >
>
> This is discussed in the rejected alternatives section.  Basically, node
> ID auto-assignment is complicated and antiquated in the age of Kubernetes,
> Chef, Puppet, Ansible, etc.
>
> >
> > > 3. Controller processes will listen on a separate endpoint from brokers
> >
> > Why is this? Kafka supports multi endpoints. For example, one broker
> > can have one endpoint for listening to connections by other brokers
> > and another endpoint for connections from admin, producer and consumer
> > clients.
> >
>
> The reason for having separate ports is discussed in KIP-590.  Basically,
> it is so that control plane traffic can be isolated from data plane
> traffic, as much as possible.  This is the existing situation with
> ZooKeeper.  Since ZK is on a separate port, the client cannot disrupt the
> cluster by flooding it with traffic (unless the admin has unwisely exposed
> all internal ports, but this would cause bigger security issues).  We want
> to preserve this property.
>
> > > 4. In the case of controller RPCs like AlterIsr, the controller
> handles this by not sending
> > > back a response until the designated change has been persisted.
> >
> > Should we enumerate these RPCs? For example, we also have
> > `ListPartitionReassignments` which is a read operation and goes
> > directly to the controller. The naive solution would be to return the
> > uncommitted state in the controller.
> >
>
> Hmm.  The KIP says that the "active controller must not make [...] future
> state "visible" to the rest of the cluster until it has been made
> persistent."  So we don't return uncommitted state for read operations.
>
> >
> > 5.
> > This KIP mentions a topic named __kafka_metadata. KIP-595 and KIP-630
> > mention a partition named __cluster_metadata. We should reconcile this
> > difference.
> >
>
> Jason did write that the name of the controller topic is "not a formal
> part of [the KIP-595] proposal."  I think he wanted to avoid having the
> discussion about the topic name in two different KIPs.  :)
>
> Let's discuss the metadata topic name here in KIP-631, and update KIP-595
> as required once this one is accepted.
>
> best,
> Colin
>
> >
> > --
> > -Jose
> >
>


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-08-03 Thread Unmesh Joshi
 RPC from Fetch, pros and
> > cons.
> >
>
> The UpdateMetadata RPC is not used in the post-KIP-500 world.  This is
> mentioned later in KIP-631 where it says that "we will no longer need to
> send out LeaderAndIsrRequest, UpdateMetadataRequest, and StopReplicaRequest"
>
> We could combine the heartbeat with the fetch request.  It would basically
> mean moving all the heartbeat fields into the fetch request.  As the KIP
> says, this would be pretty messy.  Another reason why it would be messy is
> because of the timing.  Fetch requests can get delayed when they're
> fetching a lot of data.  If this delays heartbeats then it could cause
> brokers to get fenced unnecessarily.  This is something that we've gone
> back and forth about, but overall I think it's good to at least implement
> the simple thing first.
>
> best,
> Colin
>
> >
> > Boyang
> >
> > On Wed, Jul 15, 2020 at 5:30 PM Colin McCabe  wrote:
> >
> > > On Mon, Jul 13, 2020, at 11:08, Boyang Chen wrote:
> > > > Hey Colin, some quick questions,
> > > >
> > > > 1. I looked around and didn't find a config for broker heartbeat
> > > interval,
> > > > are we piggy-back on some existing configs?
> > > >
> > >
> > > Good point.  I meant to add this, but I forgot.  I added
> > > registration.heartbeat.interval.ms in the table.
> > >
> > > >
> > > > 2. We only mentioned that the lease time is 10X of the heartbeat
> > > interval,
> > > > could we also include why we chose this value?
> > > >
> > >
> > > I will add registration.lease.timeout.ms so that this can be set
> > > separately from registration.heartbeat.interval.ms.  The choice of
> value
> > > is a balance between not timing out brokers too soon, and not keeping
> > > unavailable brokers around too long.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > On Mon, Jul 13, 2020 at 10:09 AM Jason Gustafson  >
> > > wrote:
> > > >
> > > > > Hi Colin,
> > > > >
> > > > > Thanks for the proposal. A few initial comments comments/questions
> > > below:
> > > > >
> > > > > 1. I don't follow why we need a separate configuration for
> > > > > `controller.listeners`. The current listener configuration already
> > > allows
> > > > > users to specify multiple listeners, which allows them to define
> > > internal
> > > > > endpoints that are not exposed to clients. Can you explain what
> the new
> > > > > configuration gives us that we don't already have?
> > > > > 2. What is the advantage of creating a separate `controller.id`
> > > instead of
> > > > > just using `broker.id`?
> > > > > 3. It sounds like you are imagining a stop-the-world approach to
> > > > > snapshotting, which is why we need the controller micromanaging
> > > snapshots
> > > > > on all followers. Alternatives include fuzzy snapshots which can be
> > > done
> > > > > concurrently. If this has been rejected, can you add some detail
> about
> > > why?
> > > > > 4. More of a nit, but should `DeleteBrokerRecord` be
> > > > > `ShutdownBrokerRecord`? The broker is just getting removed from
> ISRs,
> > > but
> > > > > it would still be present in the replica set (I assume).
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Sun, Jul 12, 2020 at 12:24 AM Colin McCabe 
> > > wrote:
> > > > >
> > > > > > Hi Unmesh,
> > > > > >
> > > > > > That's an interesting idea, but I think it would be best to
> strive
> > > for
> > > > > > single metadata events that are complete in themselves, rather
> than
> > > > > trying
> > > > > > to do something transactional or EOS-like.  For example, we could
> > > have a
> > > > > > create event that contains all the partitions to be created.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Fri, Jul 10, 2020, at 04:12, Unmesh Joshi wrote:
> > > > > > > I was thinking that we might need something like
> multi-operation
> > >

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-07-29 Thread Unmesh Joshi
 networking section that "The only time when
> clients
> > should contact a controller node directly is when they are debugging
> system
> > issues". But later we didn't talk about how to enable this debug mode,
> > could you consider getting a section about that?
> >
> > 2. “When the active controller decides that a standby controller should
> > start a snapshot, it will communicate that information in its response to
> > the periodic heartbeat sent by that node.“ In the KIP-595, we provide an
> > RPC called `EndQuorumEpoch` which would transfer the leadership role to a
> > dedicated successor, do you think we could reuse that method instead of
> > piggy-backing on the heartbeat RPC?
> >
> > 3. The `DeleteBroker` record is listed but not mentioned in details for
> the
> > KIP. Are we going to support removing a broker in runtime, or this record
> > is just for the sake of removing an obsolete broker due to heartbeat
> > failure?
> >
> > 4. In the rejected alternatives, we mentioned we don't want to combine
> > heartbeats and fetch and listed out the reason was due to extra
> complexity.
> > However, we should also mention some cons caused by this model, for
> example
> > we are doing 2X round trips to maintain a liveness, where as a regular
> > follower it should always send out fetch, for sure. If we are combining
> the
> > two, what are the heartbeat request fields we need to populate in the
> Fetch
> > protocol to make it work? Could we piggy-back on the UpdateMetadata RPC
> to
> > propagate the broker state change for listeners separately to the
> > controller? I'm not buying either approach here, just hope we could list
> > out more reasoning for separating the heartbeat RPC from Fetch, pros and
> > cons.
> >
> > Boyang
> >
> > On Wed, Jul 15, 2020 at 5:30 PM Colin McCabe  wrote:
> >
> > > On Mon, Jul 13, 2020, at 11:08, Boyang Chen wrote:
> > > > Hey Colin, some quick questions,
> > > >
> > > > 1. I looked around and didn't find a config for broker heartbeat
> > > interval,
> > > > are we piggy-back on some existing configs?
> > > >
> > >
> > > Good point.  I meant to add this, but I forgot.  I added
> > > registration.heartbeat.interval.ms in the table.
> > >
> > > >
> > > > 2. We only mentioned that the lease time is 10X of the heartbeat
> > > interval,
> > > > could we also include why we chose this value?
> > > >
> > >
> > > I will add registration.lease.timeout.ms so that this can be set
> > > separately from registration.heartbeat.interval.ms.  The choice of
> value
> > > is a balance between not timing out brokers too soon, and not keeping
> > > unavailable brokers around too long.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > On Mon, Jul 13, 2020 at 10:09 AM Jason Gustafson  >
> > > wrote:
> > > >
> > > > > Hi Colin,
> > > > >
> > > > > Thanks for the proposal. A few initial comments comments/questions
> > > below:
> > > > >
> > > > > 1. I don't follow why we need a separate configuration for
> > > > > `controller.listeners`. The current listener configuration already
> > > allows
> > > > > users to specify multiple listeners, which allows them to define
> > > internal
> > > > > endpoints that are not exposed to clients. Can you explain what the
> > new
> > > > > configuration gives us that we don't already have?
> > > > > 2. What is the advantage of creating a separate `controller.id`
> > > instead of
> > > > > just using `broker.id`?
> > > > > 3. It sounds like you are imagining a stop-the-world approach to
> > > > > snapshotting, which is why we need the controller micromanaging
> > > snapshots
> > > > > on all followers. Alternatives include fuzzy snapshots which can be
> > > done
> > > > > concurrently. If this has been rejected, can you add some detail
> > about
> > > why?
> > > > > 4. More of a nit, but should `DeleteBrokerRecord` be
> > > > > `ShutdownBrokerRecord`? The broker is just getting removed from
> ISRs,
> > > but
> > > > > it would still be present in the replica set (I assume).
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > >

Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-07-27 Thread Unmesh Joshi
Just checked etcd and zookeeper code, and both support leader to step down
as a follower to make sure there are no two leaders if the leader has been
disconnected from the majority of the followers
For etcd this is https://github.com/etcd-io/etcd/issues/3866
For Zookeeper its https://issues.apache.org/jira/browse/ZOOKEEPER-1699
I was just thinking if it would be difficult to implement in the Pull based
model, but I guess not. It is possibly the same way ISR list is managed
currently, if leader of the controller quorum loses majority of the
followers, it should step down and become follower, that way, telling
client in time that it was disconnected from the quorum, and not keep on
sending state metadata to clients.

Thanks,
Unmesh


On Mon, Jul 27, 2020 at 9:31 AM Unmesh Joshi  wrote:

> >>Could you clarify on this question? Which part of the raft group doesn't
> >>know about leader dis-connection?
> The leader of the controller quorum is partitioned from the controller
> cluster, and a different leader is elected for the remaining controller
> cluster.
> I think there are two things here,
> 1.  The old leader will not know if it's disconnected from the rest of the
> controller quorum cluster unless it receives BeginQuorumEpoch from the new
> leader. So it will keep on serving stale metadata to the clients (Brokers,
> Producers and Consumers)
> 2. I assume, the Broker Leases will be managed on the controller quorum
> leader. This partitioned leader will keep on tracking broker leases it has,
> while the new leader of the quorum will also start managing broker leases.
> So while the quorum leader is partitioned, there will be two membership
> views of the kafka brokers managed on two leaders.
> Unless broker heartbeats are also replicated as part of the Raft log,
> there is no way to solve this?
> I know LogCabin implementation does replicate client heartbeats. I suspect
> that the same issue is there in Zookeeper, which does not replicate client
> Ping requests..
>
> Thanks,
> Unmesh
>
>
>
> On Mon, Jul 27, 2020 at 6:23 AM Boyang Chen 
> wrote:
>
>> Thanks for the questions Unmesh!
>>
>> On Sun, Jul 26, 2020 at 6:18 AM Unmesh Joshi 
>> wrote:
>>
>> > Hi,
>> >
>> > In the FetchRequest Handling, how to make sure we handle scenarios where
>> > the leader might have been disconnected from the cluster, but doesn't
>> know
>> > yet?
>> >
>> Could you clarify on this question? Which part of the raft group doesn't
>> know about leader
>> dis-connection?
>>
>>
>> > As discussed in the Raft Thesis section 6.4, the linearizable semantics
>> of
>> > read requests is implemented in LogCabin by sending heartbeat to
>> followers
>> > and waiting till the heartbeats are successful to make sure that the
>> leader
>> > is still the leader.
>> > I think for the controller quorum to make sure none of the consumers get
>> > stale data, it's important to have linearizable semantics? In the pull
>> > based model, the leader will need to wait for heartbeats from the
>> followers
>> > before returning each fetch request from the consumer then? Or do we
>> need
>> > to introduce some other request?
>> > (Zookeeper does not have linearizable semantics for read requests, but
>> as
>> > of now all the kafka interactions are through writes and watches).
>> >
>> > This is a very good question. For our v1 implementation we are not
>> aiming
>> to guarantee linearizable read, which
>> would be considered as a follow-up effort. Note that today in Kafka there
>> is no guarantee on the metadata freshness either,
>> so no regression is introduced.
>>
>>
>> > Thanks,
>> > Unmesh
>> >
>> > On Fri, Jul 24, 2020 at 11:36 PM Jun Rao  wrote:
>> >
>> > > Hi, Jason,
>> > >
>> > > Thanks for the reply.
>> > >
>> > > 101. Sounds good. Regarding clusterId, I am not sure storing it in the
>> > > metadata log is enough. For example, the vote request includes
>> clusterId.
>> > > So, no one can vote until they know the clusterId. Also, it would be
>> > useful
>> > > to support the case when a voter completely loses its disk and needs
>> to
>> > > recover.
>> > >
>> > > 210. There is no longer a FindQuorum request. When a follower
>> restarts,
>> > how
>> > > does it discover the leader? Is that based on DescribeQuorum? It
>> would be
>> > > useful to document this.
>> > >
>> > > Jun
>

Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-07-26 Thread Unmesh Joshi
>>Could you clarify on this question? Which part of the raft group doesn't
>>know about leader dis-connection?
The leader of the controller quorum is partitioned from the controller
cluster, and a different leader is elected for the remaining controller
cluster.
I think there are two things here,
1.  The old leader will not know if it's disconnected from the rest of the
controller quorum cluster unless it receives BeginQuorumEpoch from the new
leader. So it will keep on serving stale metadata to the clients (Brokers,
Producers and Consumers)
2. I assume, the Broker Leases will be managed on the controller quorum
leader. This partitioned leader will keep on tracking broker leases it has,
while the new leader of the quorum will also start managing broker leases.
So while the quorum leader is partitioned, there will be two membership
views of the kafka brokers managed on two leaders.
Unless broker heartbeats are also replicated as part of the Raft log, there
is no way to solve this?
I know LogCabin implementation does replicate client heartbeats. I suspect
that the same issue is there in Zookeeper, which does not replicate client
Ping requests..

Thanks,
Unmesh



On Mon, Jul 27, 2020 at 6:23 AM Boyang Chen 
wrote:

> Thanks for the questions Unmesh!
>
> On Sun, Jul 26, 2020 at 6:18 AM Unmesh Joshi 
> wrote:
>
> > Hi,
> >
> > In the FetchRequest Handling, how to make sure we handle scenarios where
> > the leader might have been disconnected from the cluster, but doesn't
> know
> > yet?
> >
> Could you clarify on this question? Which part of the raft group doesn't
> know about leader
> dis-connection?
>
>
> > As discussed in the Raft Thesis section 6.4, the linearizable semantics
> of
> > read requests is implemented in LogCabin by sending heartbeat to
> followers
> > and waiting till the heartbeats are successful to make sure that the
> leader
> > is still the leader.
> > I think for the controller quorum to make sure none of the consumers get
> > stale data, it's important to have linearizable semantics? In the pull
> > based model, the leader will need to wait for heartbeats from the
> followers
> > before returning each fetch request from the consumer then? Or do we need
> > to introduce some other request?
> > (Zookeeper does not have linearizable semantics for read requests, but as
> > of now all the kafka interactions are through writes and watches).
> >
> > This is a very good question. For our v1 implementation we are not aiming
> to guarantee linearizable read, which
> would be considered as a follow-up effort. Note that today in Kafka there
> is no guarantee on the metadata freshness either,
> so no regression is introduced.
>
>
> > Thanks,
> > Unmesh
> >
> > On Fri, Jul 24, 2020 at 11:36 PM Jun Rao  wrote:
> >
> > > Hi, Jason,
> > >
> > > Thanks for the reply.
> > >
> > > 101. Sounds good. Regarding clusterId, I am not sure storing it in the
> > > metadata log is enough. For example, the vote request includes
> clusterId.
> > > So, no one can vote until they know the clusterId. Also, it would be
> > useful
> > > to support the case when a voter completely loses its disk and needs to
> > > recover.
> > >
> > > 210. There is no longer a FindQuorum request. When a follower restarts,
> > how
> > > does it discover the leader? Is that based on DescribeQuorum? It would
> be
> > > useful to document this.
> > >
> > > Jun
> > >
> > > On Fri, Jul 17, 2020 at 2:15 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the questions.
> > > >
> > > > 101. I am treating some of the bootstrapping problems as out of the
> > scope
> > > > of this KIP. I am working on a separate proposal which addresses
> > > > bootstrapping security credentials specifically. Here is a rough
> sketch
> > > of
> > > > how I am seeing it:
> > > >
> > > > 1. Dynamic broker configurations including encrypted passwords will
> be
> > > > persisted in the metadata log and cached in the broker's
> > > `meta.properties`
> > > > file.
> > > > 2. We will provide a tool which allows users to directly override the
> > > > values in `meta.properties` without requiring access to the quorum.
> > This
> > > > can be used to bootstrap the credentials of the voter set itself
> before
> > > the
> > > > cluster has been started.
> > > > 3. Some dynamic config changes will only

Re: [DISCUSS] KIP-595: A Raft Protocol for the Metadata Quorum

2020-07-26 Thread Unmesh Joshi
Hi,

In the FetchRequest Handling, how to make sure we handle scenarios where
the leader might have been disconnected from the cluster, but doesn't know
yet?
As discussed in the Raft Thesis section 6.4, the linearizable semantics of
read requests is implemented in LogCabin by sending heartbeat to followers
and waiting till the heartbeats are successful to make sure that the leader
is still the leader.
I think for the controller quorum to make sure none of the consumers get
stale data, it's important to have linearizable semantics? In the pull
based model, the leader will need to wait for heartbeats from the followers
before returning each fetch request from the consumer then? Or do we need
to introduce some other request?
(Zookeeper does not have linearizable semantics for read requests, but as
of now all the kafka interactions are through writes and watches).

Thanks,
Unmesh

On Fri, Jul 24, 2020 at 11:36 PM Jun Rao  wrote:

> Hi, Jason,
>
> Thanks for the reply.
>
> 101. Sounds good. Regarding clusterId, I am not sure storing it in the
> metadata log is enough. For example, the vote request includes clusterId.
> So, no one can vote until they know the clusterId. Also, it would be useful
> to support the case when a voter completely loses its disk and needs to
> recover.
>
> 210. There is no longer a FindQuorum request. When a follower restarts, how
> does it discover the leader? Is that based on DescribeQuorum? It would be
> useful to document this.
>
> Jun
>
> On Fri, Jul 17, 2020 at 2:15 PM Jason Gustafson 
> wrote:
>
> > Hi Jun,
> >
> > Thanks for the questions.
> >
> > 101. I am treating some of the bootstrapping problems as out of the scope
> > of this KIP. I am working on a separate proposal which addresses
> > bootstrapping security credentials specifically. Here is a rough sketch
> of
> > how I am seeing it:
> >
> > 1. Dynamic broker configurations including encrypted passwords will be
> > persisted in the metadata log and cached in the broker's
> `meta.properties`
> > file.
> > 2. We will provide a tool which allows users to directly override the
> > values in `meta.properties` without requiring access to the quorum. This
> > can be used to bootstrap the credentials of the voter set itself before
> the
> > cluster has been started.
> > 3. Some dynamic config changes will only be allowed when a broker is
> > online. For example, changing a truststore password dynamically would
> > prevent that broker from being able to start if it were offline when the
> > change was made.
> > 4. I am still thinking a little bit about SCRAM credentials, but most
> > likely they will be handled with an approach similar to
> `meta.properties`.
> >
> > 101.3 As for the question about `clusterId`, I think the way we would do
> > this is to have the first elected leader generate a UUID and write it to
> > the metadata log. Let me add some detail to the proposal about this.
> >
> > A few additional answers below:
> >
> > 203. Yes, that is correct.
> >
> > 204. That is a good question. What happens in this case is that all
> voters
> > advance their epoch to the one designated by the candidate even if they
> > reject its vote request. Assuming the candidate fails to be elected, the
> > election will be retried until a leader emerges.
> >
> > 205. I had some discussion with Colin offline about this problem. I think
> > the answer should be "yes," but it probably needs a little more thought.
> > Handling JBOD failures is tricky. For an observer, we can replicate the
> > metadata log from scratch safely in a new log dir. But if the log dir of
> a
> > voter fails, I do not think it is generally safe to start from an empty
> > state.
> >
> > 206. Yes, that is discussed in KIP-631 I believe.
> >
> > 207. Good suggestion. I will work on this.
> >
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> >
> > On Thu, Jul 16, 2020 at 3:44 PM Jun Rao  wrote:
> >
> > > Hi, Jason,
> > >
> > > Thanks for the updated KIP. Looks good overall. A few more comments
> > below.
> > >
> > > 101. I still don't see a section on bootstrapping related issues. It
> > would
> > > be useful to document if/how the following is supported.
> > > 101.1 Currently, we support auto broker id generation. Is this
> supported
> > > for bootstrap brokers?
> > > 101.2 As Colin mentioned, sometimes we may need to load the security
> > > credentials to be broker before it can be connected to. Could you
> > provide a
> > > bit more detail on how this will work?
> > > 101.3 Currently, we use ZK to generate clusterId on a new cluster. With
> > > Raft, how does every broker generate the same clusterId in a
> distributed
> > > way?
> > >
> > > 200. It would be useful to document if the various special offsets (log
> > > start offset, recovery point, HWM, etc) for the Raft log are stored in
> > the
> > > same existing checkpoint files or not.
> > > 200.1 Since the Raft log flushes every append, does that allow us to
> > > recover from a recovery point within the active segment 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-07-10 Thread Unmesh Joshi
I was thinking that we might need something like multi-operation
<https://issues.apache.org/jira/browse/ZOOKEEPER-965> record in zookeeper
to atomically create topic and partition records when this multi record is
committed.  This way metadata will have both the TopicRecord and
PartitionRecord together always, and in no situation we can have
TopicRecord without PartitionRecord. Not sure if there are other situations
where multi-operation is needed.
<https://issues.apache.org/jira/browse/ZOOKEEPER-965>

Thanks,
Unmesh

On Fri, Jul 10, 2020 at 11:32 AM Colin McCabe  wrote:

> Hi Unmesh,
>
> Yes, once the last stable offset advanced, we would consider the topic
> creation to be done, and then we could return success to the client.
>
> best,
> Colin
>
> On Thu, Jul 9, 2020, at 19:44, Unmesh Joshi wrote:
> > It still needs HighWaterMark / LastStableOffset to be advanced by two
> > records? Something like following?
> >
> >
> >||
> > <--||   HighWaterMark
> >Response|PartitionRecord |
> >||
> >-|
> >| TopicRecord|  -
> >||
> > --->   --   Previous HighWaterMark
> >CreateTopic ||
> >||
> >||
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe  wrote:
> >
> > > On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote:
> > > > I see that, when a new topic is created, two metadata records, a
> > > > TopicRecord (just the name and id of the topic) and a PartitionRecord
> > > (more
> > > > like LeaderAndIsr, with leader id and replica ids for the partition)
> are
> > > > created.
> > > > While creating the topic, log entries for both the records need to be
> > > > committed in RAFT core. Will it need something like a
> > > MultiOperationRecord
> > > > in zookeeper. Then, we can have a single log entry with both the
> records,
> > > > and  the create topic request can be fulfilled atomically when both
> the
> > > > records are committed?
> > > >
> > >
> > > Hi Unmesh,
> > >
> > > Since the active controller is the only node writing to the log, there
> is
> > > no need for any kind of synchronization or access control at the log
> level.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > > Unmesh
> > > >
> > > > On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino 
> wrote:
> > > >
> > > > > HI Colin.  Thanks for the KIP.  Here is some feedback and various
> > > > > questions.
> > > > >
> > > > > "*Controller processes will listen on a separate port from brokers.
> > > This
> > > > > will be true even when the broker and controller are co-located in
> the
> > > same
> > > > > JVM*". I assume it is possible that the port numbers could be the
> same
> > > when
> > > > > using separate JVMs (i.e. broker uses port 9192 and controller also
> > > uses
> > > > > port 9192).  I think it would be clearer to state this along these
> > > > > lines: "Controller
> > > > > nodes will listen on a port, and the controller port must differ
> from
> > > any
> > > > > port that a broker in the same JVM is listening on.  In other
> words, a
> > > > > controller and a broker node, when in the same JVM, do not share
> ports"
> > > > >
> > > > > I think the sentence "*In the realm of ACLs, this translates to
> > > controllers
> > > > > requiring CLUSTERACTION on CLUSTER for all operations*" is
> confusing.
> > > It
> > > > > feels to me that you can just delete it.  Am I missing something
> here?
> > > > >
> > > > > The KIP states "*The metadata will be stored in memory on all the
> > > active
> > > > > controllers.*"  Can there be multiple active controllers?  Should
> it
> > > > > instead read "The metadata will be stored in memory on all
> p

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-07-09 Thread Unmesh Joshi
It still needs HighWaterMark / LastStableOffset to be advanced by two
records? Something like following?


   ||
<--||   HighWaterMark
   Response|PartitionRecord |
   ||
   -|
   | TopicRecord|  -
   ||
--->   --   Previous HighWaterMark
   CreateTopic ||
   ||
   ||











On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe  wrote:

> On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote:
> > I see that, when a new topic is created, two metadata records, a
> > TopicRecord (just the name and id of the topic) and a PartitionRecord
> (more
> > like LeaderAndIsr, with leader id and replica ids for the partition) are
> > created.
> > While creating the topic, log entries for both the records need to be
> > committed in RAFT core. Will it need something like a
> MultiOperationRecord
> > in zookeeper. Then, we can have a single log entry with both the records,
> > and  the create topic request can be fulfilled atomically when both the
> > records are committed?
> >
>
> Hi Unmesh,
>
> Since the active controller is the only node writing to the log, there is
> no need for any kind of synchronization or access control at the log level.
>
> best,
> Colin
>
> >
> > Thanks,
> > Unmesh
> >
> > On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino  wrote:
> >
> > > HI Colin.  Thanks for the KIP.  Here is some feedback and various
> > > questions.
> > >
> > > "*Controller processes will listen on a separate port from brokers.
> This
> > > will be true even when the broker and controller are co-located in the
> same
> > > JVM*". I assume it is possible that the port numbers could be the same
> when
> > > using separate JVMs (i.e. broker uses port 9192 and controller also
> uses
> > > port 9192).  I think it would be clearer to state this along these
> > > lines: "Controller
> > > nodes will listen on a port, and the controller port must differ from
> any
> > > port that a broker in the same JVM is listening on.  In other words, a
> > > controller and a broker node, when in the same JVM, do not share ports"
> > >
> > > I think the sentence "*In the realm of ACLs, this translates to
> controllers
> > > requiring CLUSTERACTION on CLUSTER for all operations*" is confusing.
> It
> > > feels to me that you can just delete it.  Am I missing something here?
> > >
> > > The KIP states "*The metadata will be stored in memory on all the
> active
> > > controllers.*"  Can there be multiple active controllers?  Should it
> > > instead read "The metadata will be stored in memory on all potential
> > > controllers." (or something like that)?
> > >
> > > KIP-595 states "*we have assumed the name __cluster_metadata for this
> > > topic, but this is not a formal part of this proposal*".  This KIP-631
> > > states "*Metadata changes need to be persisted to the __metadata log
> before
> > > we propagate them to the other nodes in the cluster.  This means
> waiting
> > > for the metadata log's last stable offset to advance to the offset of
> the
> > > change.*"  Are we here formally defining "__metadata" as the topic
> name,
> > > and should these sentences refer to "__metadata topic" rather than
> > > "__metadata log"?  What are the "other nodes in the cluster" that are
> > > referred to?  These are not controller nodes but brokers, right?  If
> so,
> > > then should we say "before we propagate them to the brokers"?
> Technically
> > > we have a controller cluster and a broker cluster -- two separate
> clusters,
> > > correct?  (Even though we could potentially share JVMs and therefore
> > > require no additional processes.). If the statement is referring to
> nodes
> > > in both clusters then maybe we should state "before we propagate them
> to
> > > the other nodes in the controller cluster or to brokers."
> > >
> > > "*The controller may have several of these uncommitted changes in
> flight at
> > > any given time.  In essence, the controller's in-memory state is
> always a
> > > little bit in the futur

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-07-09 Thread Unmesh Joshi
I see that, when a new topic is created, two metadata records, a
TopicRecord (just the name and id of the topic) and a PartitionRecord (more
like LeaderAndIsr, with leader id and replica ids for the partition) are
created.
While creating the topic, log entries for both the records need to be
committed in RAFT core. Will it need something like a MultiOperationRecord
in zookeeper. Then, we can have a single log entry with both the records,
and  the create topic request can be fulfilled atomically when both the
records are committed?

Thanks,
Unmesh

On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino  wrote:

> HI Colin.  Thanks for the KIP.  Here is some feedback and various
> questions.
>
> "*Controller processes will listen on a separate port from brokers.  This
> will be true even when the broker and controller are co-located in the same
> JVM*". I assume it is possible that the port numbers could be the same when
> using separate JVMs (i.e. broker uses port 9192 and controller also uses
> port 9192).  I think it would be clearer to state this along these
> lines: "Controller
> nodes will listen on a port, and the controller port must differ from any
> port that a broker in the same JVM is listening on.  In other words, a
> controller and a broker node, when in the same JVM, do not share ports"
>
> I think the sentence "*In the realm of ACLs, this translates to controllers
> requiring CLUSTERACTION on CLUSTER for all operations*" is confusing.  It
> feels to me that you can just delete it.  Am I missing something here?
>
> The KIP states "*The metadata will be stored in memory on all the active
> controllers.*"  Can there be multiple active controllers?  Should it
> instead read "The metadata will be stored in memory on all potential
> controllers." (or something like that)?
>
> KIP-595 states "*we have assumed the name __cluster_metadata for this
> topic, but this is not a formal part of this proposal*".  This KIP-631
> states "*Metadata changes need to be persisted to the __metadata log before
> we propagate them to the other nodes in the cluster.  This means waiting
> for the metadata log's last stable offset to advance to the offset of the
> change.*"  Are we here formally defining "__metadata" as the topic name,
> and should these sentences refer to "__metadata topic" rather than
> "__metadata log"?  What are the "other nodes in the cluster" that are
> referred to?  These are not controller nodes but brokers, right?  If so,
> then should we say "before we propagate them to the brokers"?  Technically
> we have a controller cluster and a broker cluster -- two separate clusters,
> correct?  (Even though we could potentially share JVMs and therefore
> require no additional processes.). If the statement is referring to nodes
> in both clusters then maybe we should state "before we propagate them to
> the other nodes in the controller cluster or to brokers."
>
> "*The controller may have several of these uncommitted changes in flight at
> any given time.  In essence, the controller's in-memory state is always a
> little bit in the future compared to the current state.  This allows the
> controller to continue doing things while it waits for the previous changes
> to be committed to the Raft log.*"  Should the three references above be to
> the active controller rather than just the controller?
>
> "*Therefore, the controller must not make this future state "visible" to
> the rest of the cluster until it has been made persistent – that is, until
> it becomes current state*". Again I wonder if this should refer to "active"
> controller, and indicate "anyone else" as opposed to "the rest of the
> cluster" since we are talking about 2 clusters here?
>
> "*When the active controller decides that it itself should create a
> snapshot, it will first try to give up the leadership of the Raft quorum.*"
> Why?  Is it necessary to state this?  It seems like it might be an
> implementation detail rather than a necessary constraint/requirement that
> we declare publicly and would have to abide by.
>
> "*It will reject brokers whose metadata is too stale*". Why?  An example
> might be helpful here.
>
> "*it may lose subsequent conflicts if its broker epoch is stale*" This is
> the first time a "broker epoch" is mentioned.  I am assuming it is the
> controller epoch communicated to it (if any).  It would be good to
> introduce it/explicitly state what it is before referring to it.
>
> Ron
>
> On Tue, Jul 7, 2020 at 6:48 PM Colin McCabe  wrote:
>
> > Hi all,
> >
> > I posted a KIP about how the quorum-based controller envisioned in
> KIP-500
> > will work.  Please take a look here:
> > https://cwiki.apache.org/confluence/x/4RV4CQ
> >
> > best,
> > Colin
> >
>


Using zookeeper vs gossip protocols in Kafka

2016-06-01 Thread Unmesh Joshi
Hi,

I see that Kafka uses zookeeper for group membership and leader election.
Curious to know if Kafka developers ever discussed to move away from
Zookeeper and use Gossip based protocols. Is there any specific advantage
of using Zookeeper over Gossip based implementation or vice versa?

Thanks,
Unmesh