Re: [DISCUSS] KIP-874: TopicRoundRobinAssignor

2022-10-24 Thread Mathieu Amblard
Hi Sophie,

Thank you for your message.

The TopicRoundRobinAssignor has been already implemented (because the
existing ones do not address our use cases correctly), it implements the
interface you mentionned, and I have made a PR to the Kafka repo where you
can see the source code. It was in a comment of that PR that someone from
the core team proposes to create a KIP.

I have also fully unit tested this assignor (see the PR). Moreover, around
20 microservices are using it in production, for 6 months now, in the
company I am working for. So I think it has been proven and that's why I
have made this proposal. I would never have dared to suggest something I
have never tested and if there is no need for it.

Finally, this assignor suits well our Kafka architecture and use cases
contrary to the existing ones. For reminder, we are using Kafka as a
messaging system (no streaming) and we can have only one type of message
per topic. Maybe it is too specific, maybe not, that's also why I have made
this KIP, to challenge it, to see if someone has the same needs and if this
assignor can help others. If not, that's not a problem, I will simply keep
it in our source code.

Regards,
Mathieu


Le mar. 25 oct. 2022 à 01:51, Sophie Blee-Goldman
 a écrit :

> Hey Mathieu,
>
> Apologies if you already know this, but the partition assignor interface is
> fully pluggable.
> That means you can plug in a custom PartitionAssignor implementation,
> without
> having to go through a KIP or commit any code to the AK repo.
>
> I suspect some of the current unknowns and solutions will become clear when
> you start
> to actually write this assignor and test it out in your environment. Then
> you can play around
> with what works and what doesn't work, and come back to the KIP if desired
> with a stronger
> argument for why it's needed. Or you can just take your assignor and
> publish it in a public
> git repo for anyone who might have a similar use case as you.
>
> Just my two cents, I'd recommend in this case you start with the
> implementation before
> worrying about donating your assignor back to the main repo with a KIP. IF
> you do want to,
> it would then be much easier to convince people when they can see your
> assignor logic
> for themselves, and you'll be able to answer any questions.
>
> Best,
> Sophie
>
> On Fri, Oct 21, 2022 at 2:21 AM Mathieu Amblard  >
> wrote:
>
> > Hello everybody,
> >
> > Just to let you know that I have added a chapter about having multiple
> > containers (multiple pods for Kubernetes) running the same application :
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-874%3A+TopicRoundRobinAssignor#KIP874:TopicRoundRobinAssignor-Howdoesitworkifwehavemultiplecontainersrunningthesameapplication
> > ?
> >
> > Regards,
> > Mathieu
> >
> > Le mar. 11 oct. 2022 à 20:00, Mathieu Amblard  >
> > a
> > écrit :
> >
> > > Hi Hector,
> > >
> > >
> > >
> > > First, thank you for your questions !
> > >
> > >
> > >
> > > *If the goal is to do the partition assignments at a topic level,
> > wouldn't
> > > having single-partition topics solve this problem?*
> > >
> > >
> > >
> > > We are in a microservices environment; therefore, we can have multiple
> > > containers running the same application.
> > >
> > > Using the TopicRoundRobinAssignor, the partitions are uniformly
> balanced
> > > to each container, and we can get better performances.
> > >
> > > Let suppose there are 2 instances of the same application A0 and A1, 2
> > > consumers C0 and C1, and two topics t0 and t1. t0 has 3 partitions and
> t1
> > > has two partitions resulting in partitions : t0p0, t0p1, t0p2, t1p0,
> > t1p1.
> > >
> > > If we use the TopicRoundRobinAssignor, the assignment will be :
> > >
> > > A0 : [ C0: [t0p0, t0p2], C1: [t1p0] ]
> > >
> > > A1 : [ C0: [t0p1], C1: [t1p1] ]
> > >
> > >
> > >
> > > *How will the group leader know that T2 should not be re-assigned on
> the
> > > next rebalance? Can you elaborate a bit more on the mechanisms used to
> > > communicate this state to the other group members?*
> > >
> > >
> > >
> > > Currently, the group leader will not know that T2 should not be
> > > re-assigned on the next balance.
> > >
> > > For this first iteration, we simply keep in memory that T2 has a poison
> > > pill and therefore we ignore all incoming messages from T2. We
> basically
> > > consume them without acknowledging them.
> > >
> > > As you can imagine, in the case of having multiple instances of the
> same
> > > application, in case of error, the partition will be rebalanced to
> > another
> > > instance.
> > >
> > > Nevertheless, this is not really a problem (at least for our use
> cases),
> > > as soon as the poison pill is consumed, the consumer of this other
> > instance
> > > will be stopped, and so on, and so on. It will take a few tens of
> seconds
> > > before the last consumer of the poison pill will be stopped and so the
> > > consumption of the entire topic.
> > >
> > > For a second iteration, we have 

Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.3 #109

2022-10-24 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-874: TopicRoundRobinAssignor

2022-10-24 Thread Sophie Blee-Goldman
Hey Mathieu,

Apologies if you already know this, but the partition assignor interface is
fully pluggable.
That means you can plug in a custom PartitionAssignor implementation,
without
having to go through a KIP or commit any code to the AK repo.

I suspect some of the current unknowns and solutions will become clear when
you start
to actually write this assignor and test it out in your environment. Then
you can play around
with what works and what doesn't work, and come back to the KIP if desired
with a stronger
argument for why it's needed. Or you can just take your assignor and
publish it in a public
git repo for anyone who might have a similar use case as you.

Just my two cents, I'd recommend in this case you start with the
implementation before
worrying about donating your assignor back to the main repo with a KIP. IF
you do want to,
it would then be much easier to convince people when they can see your
assignor logic
for themselves, and you'll be able to answer any questions.

Best,
Sophie

On Fri, Oct 21, 2022 at 2:21 AM Mathieu Amblard 
wrote:

> Hello everybody,
>
> Just to let you know that I have added a chapter about having multiple
> containers (multiple pods for Kubernetes) running the same application :
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-874%3A+TopicRoundRobinAssignor#KIP874:TopicRoundRobinAssignor-Howdoesitworkifwehavemultiplecontainersrunningthesameapplication
> ?
>
> Regards,
> Mathieu
>
> Le mar. 11 oct. 2022 à 20:00, Mathieu Amblard 
> a
> écrit :
>
> > Hi Hector,
> >
> >
> >
> > First, thank you for your questions !
> >
> >
> >
> > *If the goal is to do the partition assignments at a topic level,
> wouldn't
> > having single-partition topics solve this problem?*
> >
> >
> >
> > We are in a microservices environment; therefore, we can have multiple
> > containers running the same application.
> >
> > Using the TopicRoundRobinAssignor, the partitions are uniformly balanced
> > to each container, and we can get better performances.
> >
> > Let suppose there are 2 instances of the same application A0 and A1, 2
> > consumers C0 and C1, and two topics t0 and t1. t0 has 3 partitions and t1
> > has two partitions resulting in partitions : t0p0, t0p1, t0p2, t1p0,
> t1p1.
> >
> > If we use the TopicRoundRobinAssignor, the assignment will be :
> >
> > A0 : [ C0: [t0p0, t0p2], C1: [t1p0] ]
> >
> > A1 : [ C0: [t0p1], C1: [t1p1] ]
> >
> >
> >
> > *How will the group leader know that T2 should not be re-assigned on the
> > next rebalance? Can you elaborate a bit more on the mechanisms used to
> > communicate this state to the other group members?*
> >
> >
> >
> > Currently, the group leader will not know that T2 should not be
> > re-assigned on the next balance.
> >
> > For this first iteration, we simply keep in memory that T2 has a poison
> > pill and therefore we ignore all incoming messages from T2. We basically
> > consume them without acknowledging them.
> >
> > As you can imagine, in the case of having multiple instances of the same
> > application, in case of error, the partition will be rebalanced to
> another
> > instance.
> >
> > Nevertheless, this is not really a problem (at least for our use cases),
> > as soon as the poison pill is consumed, the consumer of this other
> instance
> > will be stopped, and so on, and so on. It will take a few tens of seconds
> > before the last consumer of the poison pill will be stopped and so the
> > consumption of the entire topic.
> >
> > For a second iteration, we have planned to find a solution to avoid this
> > time lapse between the consumer of the first instance being stopped and
> the
> > last one. Currently, I do not have a solution, but we are thinking about
> > different options, avoiding rebalancing partitions containing a poison
> pill
> > is one of them.
> >
> >
> >
> > Cheers,
> >
> > Mathieu
> >
> > Le ven. 7 oct. 2022 à 16:54, Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> > hgerald...@bloomberg.net> a écrit :
> >
> >> Hi Mathieu. I took a look at your KIP and have a couple questions.
> >>
> >> If the goal is to do the partition assignments at a topic level,
> wouldn't
> >> having single-partition topics solve this problem?
> >>
> >> You also mentioned that your goal is to minimize the potential of a
> >> poison pill message breaking all members of a group (by keeping track of
> >> which topics have 'failed'), but it is not clear how this can be
> achieved
> >> with this assignor. If we imagine an scenario where:
> >>
> >> * A group has 3 members (A, B, C)
> >> * Members are subscribed to 3 topics (T1, T2, T3)
> >> * Each member is assigned one topic (A[T1], B[T2], C[T3])
> >> * One member fails to consume from a topic/partition (B[T2]), and goes
> >> into failed state
> >>
> >> How will the group leader know that T2 should not be re-assigned on the
> >> next rebalance? Can you elaborate a bit more on the mechanisms used to
> >> communicate this state to the other group members?
> >>
> >> Thanks
> >>
> >> From: 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1317

2022-10-24 Thread Apache Jenkins Server
See 




Faster connector config dev cycles - A vision and prototype

2022-10-24 Thread Greg Harris
Hey All,

I'd like to share with you a potential improvement to the workflow for
Connect users that may make connector configurations easier to develop and
debug. The focus for this is on reducing the number of manual steps
involved in seeing the effects of a config change, so that each iteration
is faster and a user can work through errors/misconfigurations faster.

The details should be found here:
https://gitlab.gregswebserver.com/gharris1727/kafka-notes/-/wikis/Wishlist/ConfigCycleTime

While exploring this idea, I implemented a change to the Connect Standalone
runner which watches the provided configuration files for changes, and
applies them to the worker as soon as they are discovered. The prototype
branch can be found here:
https://github.com/gharris1727/kafka/tree/prototype-standalone-config-watcher

I wanted to put this out in order to ask a question of the maintainers:
Does this sort of usability improvement belong in apache/kafka, or some
external project? Would a KIP (or several) to implement this functionality
upstream be warranted for further discussion?

As-is, I believe that it is possible to implement all of this functionality
in an external utility and/or a RestExtension without any (significant)
upstream changes. This proposal would not constitute any strictly new
use-cases, but rather make the Standalone Mode more viable for the stated
use-case, which is for "ad hoc, small, or experimental jobs."

Thanks in advance for your comments,
Greg Harris


[jira] [Created] (KAFKA-14336) Reduce allocations from MetadataResponse#convertToNodeArray

2022-10-24 Thread David Schlosnagle (Jira)
David Schlosnagle created KAFKA-14336:
-

 Summary: Reduce allocations from 
MetadataResponse#convertToNodeArray
 Key: KAFKA-14336
 URL: https://issues.apache.org/jira/browse/KAFKA-14336
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: David Schlosnagle


While profiling a Kafka consumer application that utilizes 
`org.apache.kafka.clients.admin.Admin#listOffsets(java.util.Map)`,
 one of the largest aggregate memory allocations are coming from 
`org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(List, 
Map)` due to the use of stream reference pipeline. 

We can avoid allocating the stream reference pipeline & spliterator for this 
case by explicitly allocating the presized `Node[]` and using a for loop with 
`int` induction over the specified IDs `List` argument.

{code}
java.util.stream.ReferencePipeline$3
at java.util.stream.ReferencePipeline.map(Function)
at org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(List, 
Map)
   at 
org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse$PartitionMetadata,
 Map)
   at org.apache.kafka.common.requests.MetadataResponse.cluster()
   at 
org.apache.kafka.clients.admin.KafkaAdminClient.getListOffsetsCalls(MetadataOperationContext,
 Map, Map)
   at 
org.apache.kafka.clients.admin.KafkaAdminClient.lambda$listOffsets$21(MetadataOperationContext,
 Map, Map)
   at 
org.apache.kafka.clients.admin.KafkaAdminClient$$Lambda$3539+0x00080142d120.1364252822.get()
   at 
org.apache.kafka.clients.admin.KafkaAdminClient$23.handleResponse(AbstractResponse)
   at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(long,
 List)
   at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests()
   at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run()
   at java.lang.Thread.run()
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14335) Admin.listConsumerGroups should allow filtering, pagination

2022-10-24 Thread Kirk True (Jira)
Kirk True created KAFKA-14335:
-

 Summary: Admin.listConsumerGroups should allow filtering, 
pagination
 Key: KAFKA-14335
 URL: https://issues.apache.org/jira/browse/KAFKA-14335
 Project: Kafka
  Issue Type: Improvement
  Components: admin, clients, protocol
Affects Versions: 3.3.0
Reporter: Kirk True
Assignee: Kirk True


The 
[`Admin.listConsumerGroups`|https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/Admin.html#listConsumerGroups--]
 is the interface for clients to view the consumer groups in the cluster. When 
the list of consumer groups becomes very large, it can cause problems for the 
client (e.g., OOM errors) as well as overhead for the broker and network.

The proposal is to enhance the 
[`ListConsumerGroupsOptions`]([https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.html)]
 class to have optional values such as:
 * Consumer group ID regex (evaluated on broker)
 * Pagination token (consumer group ID, probably)

This will require a KIP since it is enhancing the admin API, protocol, and 
broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1316

2022-10-24 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-13518) Update gson dependency

2022-10-24 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-13518.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

> Update gson dependency
> --
>
> Key: KAFKA-13518
> URL: https://issues.apache.org/jira/browse/KAFKA-13518
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Pavel Kuznetsov
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: security
> Fix For: 3.4.0
>
>
> *Describe the bug*
> I checked kafka_2.13-3.0.0.tgz distribution with WhiteSource and find out 
> that some libraries have vulnerabilities.
> Here they are:
> * gson-2.8.6.jar has WS-2021-0419 vulnerability. The way to fix it is to 
> upgrade to com.google.code.gson:gson:2.8.9
> * netty-codec-4.1.65.Final.jar has CVE-2021-37136 and CVE-2021-37137 
> vulnerabilities. The way to fix it is to upgrade to 
> io.netty:netty-codec:4.1.68.Final
> *To Reproduce*
> Download kafka_2.13-3.0.0.tgz and find jars, listed above.
> Check that these jars with corresponding versions are mentioned in 
> corresponding vulnerability description.
> *Expected behavior*
> * gson upgraded to 2.8.9 or higher
> * netty-codec upgraded to 4.1.68.Final or higher
> *Actual behaviour*
> * gson is 2.8.6
> * netty-codec is 4.1.65.Final



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-24 Thread Jun Rao
Hi, David,

Thanks for the updated KIP. A few more comments.

90. ConsumerGroupTargetAssignmentMemberValue:
90.1 Do we need to include MemberId here given that it's in the key already?
90.2 Since there is no new record if the new member assignment is the same,
it seems that AssignmentEpoch doesn't always reflect the correct assignment
epoch? If so, do we still need this field? Could we just depend
on ConsumerGroupTargetAssignmentMetadataValue.AssignmentEpoch?

91. "The AssignmentEpoch corresponds to the group epoch used to compute the
assignment. It is not necessarily the last one." Could you explain what "It
is not necessarily the last one." means?

Thanks,

Jun


On Mon, Oct 24, 2022 at 7:49 AM Magnus Edenhill  wrote:

> Hi, one minor comment on the latest update:
>
>
> Den mån 24 okt. 2022 kl 16:26 skrev David Jacot
>  >:
>
> > * Jason pointed out that the member id handling is a tad weird. The
> > group coordinator generates the member id and then trusts the member
> > when it rejoins the group. This also implies that the client could
> > directly generate its member id and the group coordinator will accept
> > it. It seems better to directly let the client generate id instead of
> > relying on the group coordinator. I have updated the KIP in this
> > direction. Note that the new APIs still use a string for the member id
> > in order to remain consistent with the existing APIs.
> >
>
> We had a similar discussion for id generation in KIP-714 and I'd advise
> against client-side id generation for a couple of reasons:
>  - it is much more likely for the client side prng to be poorly seeded, or
> incorrectly implemented, than the server side.
>This risks two different consumer instances generating the same id.
>  - it adds an extra dependency on the client, a uuid library/module, which
> brings with it the usual plethora
>of linking conflicts, package availability issues, etc.
>  - as for trusting the authenticity of the id; with server-side generation
> we at least have a (future) possibility for verifying the id, would it ever
> become an issue.
>
>
> Regards,
> Magnus
>


[jira] [Created] (KAFKA-14334) DelayedFetch purgatory not completed when appending as follower

2022-10-24 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-14334:


 Summary: DelayedFetch purgatory not completed when appending as 
follower
 Key: KAFKA-14334
 URL: https://issues.apache.org/jira/browse/KAFKA-14334
 Project: Kafka
  Issue Type: Bug
Reporter: Jeff Kim


Currently, the ReplicaManager.delayedFetchPurgatory is only completed when 
appending as leader. With 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]
 enabled, followers will also have to complete delayed fetch requests after 
successfully replicating. Otherwise, consumer fetches to closest followers will 
hit fetch.max.wait.ms



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-24 Thread Magnus Edenhill
Hi, one minor comment on the latest update:


Den mån 24 okt. 2022 kl 16:26 skrev David Jacot :

> * Jason pointed out that the member id handling is a tad weird. The
> group coordinator generates the member id and then trusts the member
> when it rejoins the group. This also implies that the client could
> directly generate its member id and the group coordinator will accept
> it. It seems better to directly let the client generate id instead of
> relying on the group coordinator. I have updated the KIP in this
> direction. Note that the new APIs still use a string for the member id
> in order to remain consistent with the existing APIs.
>

We had a similar discussion for id generation in KIP-714 and I'd advise
against client-side id generation for a couple of reasons:
 - it is much more likely for the client side prng to be poorly seeded, or
incorrectly implemented, than the server side.
   This risks two different consumer instances generating the same id.
 - it adds an extra dependency on the client, a uuid library/module, which
brings with it the usual plethora
   of linking conflicts, package availability issues, etc.
 - as for trusting the authenticity of the id; with server-side generation
we at least have a (future) possibility for verifying the id, would it ever
become an issue.


Regards,
Magnus


Re: [VOTE] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-10-24 Thread ShunKang Lin
Hi everyone,

Thanks Luke.
I have submitted a PR  https://github.com/apache/kafka/pull/12545, please
take a look.

Best,
ShunKang

Luke Chen  于2022年10月24日周一 09:56写道:

> Hi ShunKang,
> Yes, this vote can be concluded.
> You can start to implement it.
>
> Thanks.
> Luke
>
>
> On Fri, Oct 21, 2022 at 9:45 PM ShunKang Lin 
> wrote:
>
> > Hi everyone,
> >
> > Thank you for the vote. I've got three +1 votes (Guozhang, Luke, Chris),
> > can this vote be concluded?
> >
> > Best,
> > ShunKang
> >
> > Chris Egerton  于2022年10月12日周三 23:17写道:
> >
> > > +1 (binding)
> > > Thanks ShunKang!
> > >
> > > On Tue, Oct 11, 2022 at 9:26 PM Luke Chen  wrote:
> > >
> > > > +1 from me.
> > > > Thanks for the KIP.
> > > >
> > > > Luke
> > > >
> > > > On Fri, Sep 23, 2022 at 1:50 AM Guozhang Wang 
> > > wrote:
> > > >
> > > > > +1, thanks ShunKang.
> > > > >
> > > > > Though its proposed motivation is on consumer fetcher's
> > > deserialization,
> > > > I
> > > > > think adding an overloaded method with ByteBuffer would help with
> > other
> > > > > serde places on the client side as well.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Sep 22, 2022 at 9:41 AM ShunKang Lin <
> > > linshunkang@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I'd like to open the vote for KIP-863, which proposes to reduce
> > > memory
> > > > > > allocation and memory copying in
> > Fetcher#parseRecord(TopicPartition,
> > > > > > RecordBatch, Record).
> > > > > >
> > > > > > The proposal is here:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
> > > > > >
> > > > > > Thanks to all who reviewed the proposal, and thanks in advance
> for
> > > > taking
> > > > > > the time to vote!
> > > > > >
> > > > > > Best,
> > > > > > ShunKang
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] (continued) KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-24 Thread David Jacot
Hi all, hi Jason,

Based on Jason's feedback, I have updated the KIP as follows.

* Jason pointed out that the member id handling is a tad weird. The
group coordinator generates the member id and then trusts the member
when it rejoins the group. This also implies that the client could
directly generate its member id and the group coordinator will accept
it. It seems better to directly let the client generate id instead of
relying on the group coordinator. I have updated the KIP in this
direction. Note that the new APIs still use a string for the member id
in order to remain consistent with the existing APIs.

* Using one record to store the target assignment limit the
scalability because of the maximum batch size (1MB by default).
Instead, I purpose to use N records to persist the member assignment
followed by one records the assignment metadata. When the assignment
is updated, the group coordinator will only write the different
between the new and the old assignment. Those records must be written
automatically to the log. As a first step, I propose to rely on the
atomicity of the batch to achieve this. In the future, we could extend
this with a mechanism similar to KIP-868. I have added this to the
future section.

* I have added the upgrade/downgrade in the future work section. As
Jason's pointed out, the group coordinator has always been bad from a
downgrade perspective. This is something that we should improve.
Jason's also suggest to use a separate feature flag (e.g.
group.metadata.version) instead of relying on metadata.version used by
the quorum controller. While I tend to agree with this, I think that
we need to think this through so I propose to revise this in
subsequent KIP focused on the upgrade/downgrade.

Best,
David

On Thu, Oct 20, 2022 at 5:56 PM David Jacot  wrote:
>
> Hi Jason,
>
> Thanks for your comments. Please find my answers below.
>
> 1. Yeah, I agree that the handling is a tad awkward. It is interesting
> to note that the client could actually generate its own member id with
> the current handling. It may be better to just standardize on doing
> this. I cannot really think of any downsides for it.
>
> 2. I was not aware of the aim to have the --release flag in the
> future. That would definitely illimitate my concern. I will go ahead
> and update the KIP. I will also mention the downgrade as a future
> improvement. We can cover this in a future KIP.
>
> 3. I wanted to avoid having to recompute the assignment if we discover
> that it is corrupted. That is not an issue when a server side assignor
> is used but it could take time when a client side assignor is used.
> This is not ideal because during this time the members are without a
> target assignment and we cannot rely on the previous one because it
> may be gone (or partially compacted to be precise). Members could not
> even be fully reconciled with the previous assignment so the entire
> reconciliation process would be stuck.
>
> The simplest solution that I could think of is to use separate records
> for assignment metadata and member assignments, and to rely on the
> batch to enforce the atomicity of the update. The assignment will be
> updated most likely incrementally so the number of records should be
> rather small in each delta update. This means that we would be rather
> good with the default 1MB limit in this case. This would also allow us
> to move towards the transaction based solution in the future if we
> have to. This would be already better than the solution proposed in
> the KIP..
>
> Best,
> David
>
> On Thu, Oct 20, 2022 at 2:59 PM Jason Gustafson
>  wrote:
> >
> > Hi David,
> >
> > Thanks for the response. Replies below:
> >
> > > To answer your two other questions.
> > The consumer would keep it after a session timeout but the coordinator
> > would not. The coordinator would return UNKNOWN_MEMBER_ID when the
> > member id does not exist and the member epoch is not equal to zero.
> > This differentiation is perhaps overkill here. What do you think?
> >
> > I do think the concept of an incarnation ID is useful. It allows us to
> > continue tracking consumer instances as they join and leave the group,
> > which is useful when debugging. I find the handling a tad awkward
> > though. It sounds like we are trying not to let the UUID be generated on
> > the client side, but after the coordinator drops the state, then it has
> > to trust the client. That is probably fine I guess. Alternatively, we could
> > separate
> > the incarnation ID. Let the client provide the value for that, but let the
> > coordinator provide a member (or session) ID which is only valid for the
> > duration of the session. Not really sure it's worth having two IDs, but it
> > would get around the slight awkwardness. Or perhaps we could let the
> > coordinator trust the client-provided ID consistently. A small benefit of
> > either of these approaches is that the ID would cover the entire lifecycle
> > of the consumer, and not just the time 

Re: [VOTE] KIP-710: Full support for distributed mode in dedicated MirrorMaker 2.0 clusters

2022-10-24 Thread Chris Egerton
+1 (binding). Thanks for the KIP!

On Sat, Oct 22, 2022 at 3:38 AM Urbán Dániel  wrote:

> Hi everyone,
>
> I would like to start a vote on KIP-710 which aims to support running a
> dedicated MM2 cluster in distributed mode:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters
>
> Regards,
> Daniel
>
>
> --
> Ezt az e-mailt átvizsgálta az Avast AntiVirus szoftver.
> www.avast.com
>


Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-24 Thread Andrew Grant
Hey David,


Thanks for the KIP. I had a few small questions.


“The ZK data migration will copy the existing ZK data into the KRaft
metadata log and establish the new KRaft active controller as the active
controller from a ZK perspective.”

How will the code get all the ZooKeeper config? Will it do some sort of
scan of the ZooKeeper data store? Also I’m not a ZooKeeper expert but per
https://zookeeper.apache.org/doc/current/zookeeperInternals.html “Read
operations in ZooKeeper are *not linearizable* since they can return
potentially stale data.” Do we need to do anything special to make sure we
get all data from ZooKeeper?


“For the initial migration, the controller will utilize KIP-868 Metadata
Transactions

to
write all of the ZK metadata in a single transaction. If the controller
fails before this transaction is finalized, the next active controller will
abort the transaction and restart the migration process.” What happens if
we commit the transaction then fail right after that and restart? This sort
of leads to me wondering if we will/should check the metadata log state
before doing the migration? That could also catch mistakes such as if a
KRaft quorum is used that already had some metadata which I assume we want
to prevent.


For the Test Plan section do you think it’s worth calling out performance
testing migrations of ZooKeeper deployments that have “large” amounts of
metadata?


Thanks,

Andrew

On Mon, Oct 24, 2022 at 3:20 AM Luke Chen  wrote:

> Hi David,
>
> Thanks for the good and complicated proposal! :)
> Some questions:
>
> 1. "MigrationReady" state: The KIP said:
> The KRaft quorum has been started
> I don't think we'll automatically enter this state after KRaft quorum
> started, do we?
> I think KRaft active controller should take over leadership by writing a
> znode in /controller path before we entering this sate, is that correct?
>
> 2. "MigrationActive" state: the KIP said:
> ZK state has been migrated, controller is in dual-write mode, brokers are
> being restarted in KRaft mode
> What confuses me is the last part: "brokers are being restarted in KRaft
> mode".
> How could we detect brokers are being restarted in KRaft mode? Old ZK
> broker is removed and new KRaft broker is up within N minutes?
> I think we don't have to rely on the condition "brokers are being restarted
> in KRaft mode" to enter this state.
> "brokers are being restarted in KRaft mode" should be a process happened
> between "MigrationActive" and "MigrationFinished". Does that make sense?
>
> 3. When "Zookeeper" mode trying to enter "MigrationEligible", if there is a
> cluster with tens of brokers, how could users know why this cluster cannot
> be in "MigrationEligible" state, yet? Check that znode manually one by one?
> Or do we plan to create a tool to help them? Or maybe expose the "unready
> ZK brokers" metrics?
>
> 4. Same for "MigrationActive" entering "MigrationFinished" state. Since
> that will be some process for restarting ZK broker into KRaft broker one by
> one, could we expose the "remaining ZK brokers" as metrics?
>
> 5. When users are in "MigrationReady"/"MigrationActive"/"MigrationFinished"
> states, and they accidentally change the KRaft controller config:
> "kafka.metadata.migration.enable"
> to false. What will happen to this cluster? No dual-write for it? Will we
> have any protection for it?
>
> 6. About the "MigrationState" metric:
> The "ZooKeeper" and "MigrationEligible" is reported by ZK controller, and
> the rest of states are reported by KRaft controller -->  makes sense to me.
> One question from it is, when KRaft controller takes over the leadership
> from ZK controller, what will the "MigrationState" value in old ZK
> controller? keep in "MigrationEligible" doesn't make sense. Will there be a
> empty or null state?
>
> 7. About the "MetadataType" metric:
> An enumeration of: ZooKeeper (1), Dual (2), KRaft (3). Each broker reports
> this.
> I don't know how we could map the migration state to these 3 types.
> What is the metadataType when cluster in "MigrationReady" state? Still
> Zookeeper?
> When will brokers enter Dual type?
> This is unclear in the KIP.
>
> Thank you.
> Luke
>
> On Thu, Oct 20, 2022 at 11:33 PM David Arthur
>  wrote:
>
> > Igor, thanks for taking a look! Since JBOD in KRaft is still under
> > discussion and not likely to land before the ZK migration, I think we'll
> > need to defer it. For migrating JBOD clusters from ZK to KRaft, we'll
> also
> > need to address the log dir failure mechanism which currently uses a
> > special ZNode written to by the brokers. There is an old KIP
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller
> > which proposes a new RPC to move that ZK write to the controller, but I'm
> > not sure if that's the approach we'll want to take. I read through the
> > discussion over in KIP-858 and it 

[jira] [Created] (KAFKA-14333) Kafka Admin delete records java doc is missing information about supported cleanup.policy

2022-10-24 Thread Tomasz Kaszuba (Jira)
Tomasz Kaszuba created KAFKA-14333:
--

 Summary: Kafka Admin delete records java doc is missing 
information about supported cleanup.policy
 Key: KAFKA-14333
 URL: https://issues.apache.org/jira/browse/KAFKA-14333
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 3.3.1
Reporter: Tomasz Kaszuba


The deleteRecords command on the KafkaAdminClient will throw an exception if 
the cleanup.policy on a topic is not set to delete, yet this is not mentioned 
in the java doc.

[https://github.com/apache/kafka/blob/968e18cc799faa6b2b5f94e4f376bc768895afd2/core/src/main/scala/kafka/cluster/Partition.scala#L1241-L1242]

+Java Doc
+
[https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#deleteRecords-java.util.Map-org.apache.kafka.clients.admin.DeleteRecordsOptions-]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14332) Split out checkstyle configs between test and main

2022-10-24 Thread Matthew de Detrich (Jira)
Matthew de Detrich created KAFKA-14332:
--

 Summary: Split out checkstyle configs between test and main
 Key: KAFKA-14332
 URL: https://issues.apache.org/jira/browse/KAFKA-14332
 Project: Kafka
  Issue Type: Bug
Reporter: Matthew de Detrich






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14331) Upgrade to Scala 2.13.10

2022-10-24 Thread Viktor Somogyi-Vass (Jira)
Viktor Somogyi-Vass created KAFKA-14331:
---

 Summary: Upgrade to Scala 2.13.10
 Key: KAFKA-14331
 URL: https://issues.apache.org/jira/browse/KAFKA-14331
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.4.0
Reporter: Viktor Somogyi-Vass


There are some CVEs in Scala 2.13.8, so we should upgrade to the latest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14330) The Kafka Connection Is High During Service Startup

2022-10-24 Thread flybird (Jira)
flybird created KAFKA-14330:
---

 Summary: The Kafka Connection Is High During Service Startup
 Key: KAFKA-14330
 URL: https://issues.apache.org/jira/browse/KAFKA-14330
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 2.8.1
Reporter: flybird


The Kafka Connection Is High During Service Startup

During service restart, the number of Kafka connections suddenly increases for 
a period of time. The actual number of process connections is 328. However, the 
number of consumer threads and producer threads of the process is changed by 
querying the number of consumer threads and producer threads. The number of 
consumer threads x 2 + producer threads = 283. After a while, the number of 
connections changes back to 283.

The TCP connection is created when the KafkaConsumer.poll method is invoked. 
There are three times to create a TCP connection inside the poll method.
1. Created when the findCoordinator request is initiated to determine the 
coordinator and obtain the cluster metadata.
2. When the coordinator is connected, enable normal group coordination 
operations.
3. Obtain the actual message when consuming data.
When a Type 3 TCP connection is successfully created, the consumer program 
discards the Type 1 TCP connection and then uses the Type 3 TCP connection 
instead when periodically requesting metadata. For a consumer program that has 
been running for a while, there are only the latter two types of TCP 
connections.

In summary, the producer creates one connection, and the consumer creates three 
connections. However, the first type of connection is closed after the third 
type of connection is created. In the end, there are only two connections.

The first type of connection analysis is a temporary connection, which causes a 
high number of connections. Why not disable the third type of connection 
immediately after it is created?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14329) KafkaAdminClient#listOffsets should query by partition dimension

2022-10-24 Thread shizhenzhen (Jira)
shizhenzhen created KAFKA-14329:
---

 Summary: KafkaAdminClient#listOffsets should query by partition 
dimension
 Key: KAFKA-14329
 URL: https://issues.apache.org/jira/browse/KAFKA-14329
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Affects Versions: 2.7.2
Reporter: shizhenzhen
 Attachments: image-2022-10-24-16-18-01-418.png, 
image-2022-10-24-16-20-31-778.png

 

listOffsets接口查询 TopicPartition分区Offset的时候,  正常来说应该是按照分区维度来查询的

 

比如 我查询了 Topic1-0  、Topic1-1 ; 返回对应的数据。

 

正常情况下是这样没错,但是在调用这个接口之前 先调用了 Metadata接口。

 

并且在处理Metadata返回数据的时候,判断了如果对应的Topic有一个Topic分区leader不存在。都会抛出异常。如下:

 

!image-2022-10-24-16-20-31-778.png!

 

 

!image-2022-10-24-16-18-01-418.png!

 

 

假设这种情况:

 

Topic1 有3个分区; Topic1-0;Topic1-1; Topic1-2 ;

 

但是刚好 Topic1-2 这个分区由于某些原因导致分区Leader = -1

 

这个时候我想去查询  Topic1-0;Topic1-1; 这两个分区的Offset的时候,它直接给我抛出来异常了

 

正常逻辑,我这里没有涉及到有问题的Topic1-2;  我应该是能够查询出来数据的。

 

但是却给我异常了,这非常的不友好。

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-866 ZooKeeper to KRaft Migration

2022-10-24 Thread Luke Chen
Hi David,

Thanks for the good and complicated proposal! :)
Some questions:

1. "MigrationReady" state: The KIP said:
The KRaft quorum has been started
I don't think we'll automatically enter this state after KRaft quorum
started, do we?
I think KRaft active controller should take over leadership by writing a
znode in /controller path before we entering this sate, is that correct?

2. "MigrationActive" state: the KIP said:
ZK state has been migrated, controller is in dual-write mode, brokers are
being restarted in KRaft mode
What confuses me is the last part: "brokers are being restarted in KRaft
mode".
How could we detect brokers are being restarted in KRaft mode? Old ZK
broker is removed and new KRaft broker is up within N minutes?
I think we don't have to rely on the condition "brokers are being restarted
in KRaft mode" to enter this state.
"brokers are being restarted in KRaft mode" should be a process happened
between "MigrationActive" and "MigrationFinished". Does that make sense?

3. When "Zookeeper" mode trying to enter "MigrationEligible", if there is a
cluster with tens of brokers, how could users know why this cluster cannot
be in "MigrationEligible" state, yet? Check that znode manually one by one?
Or do we plan to create a tool to help them? Or maybe expose the "unready
ZK brokers" metrics?

4. Same for "MigrationActive" entering "MigrationFinished" state. Since
that will be some process for restarting ZK broker into KRaft broker one by
one, could we expose the "remaining ZK brokers" as metrics?

5. When users are in "MigrationReady"/"MigrationActive"/"MigrationFinished"
states, and they accidentally change the KRaft controller config:
"kafka.metadata.migration.enable"
to false. What will happen to this cluster? No dual-write for it? Will we
have any protection for it?

6. About the "MigrationState" metric:
The "ZooKeeper" and "MigrationEligible" is reported by ZK controller, and
the rest of states are reported by KRaft controller -->  makes sense to me.
One question from it is, when KRaft controller takes over the leadership
from ZK controller, what will the "MigrationState" value in old ZK
controller? keep in "MigrationEligible" doesn't make sense. Will there be a
empty or null state?

7. About the "MetadataType" metric:
An enumeration of: ZooKeeper (1), Dual (2), KRaft (3). Each broker reports
this.
I don't know how we could map the migration state to these 3 types.
What is the metadataType when cluster in "MigrationReady" state? Still
Zookeeper?
When will brokers enter Dual type?
This is unclear in the KIP.

Thank you.
Luke

On Thu, Oct 20, 2022 at 11:33 PM David Arthur
 wrote:

> Igor, thanks for taking a look! Since JBOD in KRaft is still under
> discussion and not likely to land before the ZK migration, I think we'll
> need to defer it. For migrating JBOD clusters from ZK to KRaft, we'll also
> need to address the log dir failure mechanism which currently uses a
> special ZNode written to by the brokers. There is an old KIP
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller
> which proposes a new RPC to move that ZK write to the controller, but I'm
> not sure if that's the approach we'll want to take. I read through the
> discussion over in KIP-858 and it sounds like there are some good ideas
> there.
>
> To answer your question more directly, migrating ZK clusters using JBOD is
> out of scope for this KIP. It might be possible to stop using JBOD using
> reassignments, but I'm not sure about that.
>
> -David
>
> On Tue, Oct 18, 2022 at 12:17 PM Igor Soarez  wrote:
>
> > Hi David,
> >
> > Thanks for the KIP, this is very exciting!
> >
> > How does JBOD relate to this work? KRaft mode doesn't yet support
> > configuring brokers with multiple log directories. If the brokers in the
> > existing cluster are configured with multiple log dirs, does the
> migration
> > imply that the existing brokers need to drop use of that feature? Or is
> > there some way to upgrade them later?
> >
> > Thanks,
> >
> > --
> > Igor
> >
> > On Mon, Oct 17, 2022, at 10:07 PM, David Arthur wrote:
> > > I've updated the KIP with the following changes (the confluence diff is
> > not
> > > helpful here since i rearranged some things)
> > >
> > > * Added ZooKeeperBlockingKRaftMillis metric
> > > * Added section on new broker registration JSON
> > > * Removed section on MigrationCheck RPC
> > > * Added change to UpdateMetadataRequest
> > > * Added section "Additional ZK Broker Configs" (includes configs to
> > connect
> > > to KRaft quorum)
> > > * Added section on "Incompatible Brokers" under Failure Modes
> > > * Clarified many things per this discussion thread
> > >
> > > I realized we need the KRaft controller to pick the correct
> > > "metadata.version" when initializing the migration. I included the IBP
> > of a
> > > broker in its registration data so the KRaft controller can verify the
> > IBP
> > > and pick the correct "metadata.version" when starting