Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax
One more thing. It might be good to clearly call out, which interfaced a 
user would implement, vs the other ones Kafka Streams implements and 
TaskAssignor only uses.


My understanding is, that users would implement `TaskAssignor`, 
`TaskAssignment`, and `StreamsClientAssignment`.


For `AssignedTask` it seems that users would actually only need to 
instantiate them. Should we add a public constructor?


Also wondering if we should add an empty default implementation for 
`onAssignmentComputed()` as it seems not to be strictly necessary to use 
this method?



-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2829

2024-04-19 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I 
prefer KafkaStreams* since that class represents the Kafka Streams 
client. I am also fine with KafkaStreamsClient*. I really would like to 
avoid introducing a new term in Kafka Streams for which we already have 
an equivalent term even if it is used on the brokers since that is a 
different level of abstraction. Additionally, I have never been a big 
fan of the term "instance".


(4)
I think the question is if we need to retrieve assignment metadata by 
task for a Kafka client or if it is enough to iterate over the assigned 
tasks. Could you explain why we cannot add 

Re: [VOTE] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-19 Thread Justine Olshan
Hey Nikhil,

I meant to comment on the discussion thread, but my draft took so long, you
opened the vote.

Regardless, I just wanted to say that it makes sense to me. +1 (binding)

Justine

On Fri, Apr 19, 2024 at 7:22 AM Nikhil Ramakrishnan <
ramakrishnan.nik...@gmail.com> wrote:

> Hi everyone,
>
> I would like to start a voting thread for KIP-1037: Allow
> WriteTxnMarkers API with Alter Cluster Permission
> (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission
> )
> as there have been no objections on the discussion thread.
>
> For comments or feedback please check the discussion thread here:
> https://lists.apache.org/thread/bbkyt8mrc8xp3jfyvhph7oqtjxl29xmn
>
> Thanks,
> Nikhil
>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Rohan Desai
Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai  wrote:

> Thanks for the feedback Bruno! For the most part I think it makes sense,
> but leaving a couple follow-up thoughts/questions:
>
> re 4: I think Sophie's point was slightly different - that we might want
> to wrap the return type for `assign` in a class so that its easily
> extensible. This makes sense to me. Whether we do that or not, we can have
> the return type be a Set instead of a Map as well.
>
> re 6: Yes, it's a callback that's called with the final assignment. I like
> your suggested name.
>
> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback Sophie!
>>
>> re1: Totally agree. The fact that it's related to the partition assignor
>> is clear from just `task.assignor`. I'll update.
>> re3: This is a good point, and something I would find useful personally.
>> I think its worth adding an interface that lets the plugin observe the
>> final assignment. I'll add that.
>> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>>
>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
>> wrote:
>>
>>> Thanks for the feedback so far! I think pretty much all of it is
>>> reasonable. I'll reply to it inline:
>>>
>>> > 1. All the API logic is granular at the Task level, except the
>>> previousOwnerForPartition func. I’m not clear what’s the motivation
>>> behind it, does our controller also want to change how the
>>> partitions->tasks mapping is formed?
>>> You're right that this is out of place. I've removed this method as it's
>>> not needed by the task assignor.
>>>
>>> > 2. Just on the API layering itself: it feels a bit weird to have the
>>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>>> the ApplicationMetadata class. If we consider them as some default util
>>> functions, how about introducing moving those into their own static util
>>> methods to separate from the ApplicationMetadata “fact objects” ?
>>> Agreed. Updated in the latest revision of the kip. These have been moved
>>> to TaskAssignorUtils
>>>
>>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>>> containing the decisions made by the assignor, including the
>>> requestFollowupRebalance flag. For manipulating the half-baked results
>>> inside the assignor itself, maybe we can just be flexible to let users use
>>> whatever struts / their own classes even, if they like. WDYT?
>>> Agreed. Updated in the latest version of the kip.
>>>
>>> > 1. For the API, thoughts on changing the method signature to return a
>>> (non-Optional) TaskAssignor? Then we can either have the default
>>> implementation return new HighAvailabilityTaskAssignor or just have a
>>> default implementation class that people can extend if they don't want to
>>> implement every method.
>>> Based on some other discussion, I actually decided to get rid of the
>>> plugin interface, and instead use config to specify individual plugin
>>> behaviour. So the method you're referring to is no longer part of the
>>> proposal.
>>>
>>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only
>>> but
>>> theres methods that return void on it? It's not totally clear to me how
>>> that interface is supposed to be used by the assignor. It'd be nice if we
>>> could flip that interface such that it becomes part of the output instead
>>> of an input to the plugin.
>>> I've moved those methods to a util class. They're really utility methods
>>> the assignor might want to call to do some default or optimized assignment
>>> for some cases like rack-awareness.
>>>
>>> > 4. We should consider wrapping UUID in a ProcessID class so that we
>>> control
>>> the interface (there are a few places where UUID is directly used).
>>> I like it. Updated the proposal.
>>>
>>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
>>> was
>>> for the plugin to make the assignment? Is that the result of the default
>>> logic?
>>> It doesn't need to be part of the interface. I've removed it.
>>>
>>> > re 2/6:
>>>
>>> I generally agree with these points, but I'd rather hash that out in a
>>> PR than in the KIP review, as it'll be clearer what gets used how. It seems
>>> to me (committers please correct me if I'm wrong) that as long as we're on
>>> the same page about what information the interfaces are returning, that's
>>> ok at this level of discussion.
>>>
>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
>>> wrote:
>>>
 Hello All,

 I'd like to start a discussion on KIP-924 (
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
 which proposes an interface to allow users to plug into the streams
 partition assignor. The motivation section in the KIP goes into some more
 detail on why we think this is a useful addition. Thanks in advance for
 your feedback!

 Best Regards,

 Rohan

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-19 Thread Jun Rao
Hi, Andrew,

Thanks for the reply. A few more comments.

120. There is still reference to ConsumerGroupMetadataKey.

121. ShareUpdateValue.SnapshotEpoch: Should we change it since it's not a
snapshot?

122. ConsumerGroupMemberMetadataValue includes epoch, but
ShareGroupMemberMetadataValue does not. Do we know how the epoch is being
used in the consumer group and whether it's needed in the share group?

123. There is no equivalent of ConsumerGroupTargetAssignmentMember for
ShareGroup. How does the shareGroup persist the member assignment?

124. Assign a share-partition: "When a topic-partition is assigned to a
member of a share group for the first time, the group coordinator writes a
ShareGroupPartitionMetadata record to the __consumer_offsets  topic".
  When does the coordinator write ShareGroupMetadata with the bumped epoch?
In general, is there a particular order to bump up the epoch in different
records? When can the new epoch be exposed to the sharePartitionLeader? It
would be useful to make this clear in other state changing operations too.

125. "Alter share group offsets Group coordinator and share coordinator
Only empty share groups support this operation. The group coordinator sends
an InitializeShareGroupState  request to the share coordinator. The share
coordinator writes a ShareSnapshot record with the new state epoch to the
__share_group_state  topic."
  Does the operation need to write ShareGroupPartitionMetadata and
ShareGroupMetadata (for the new epoch)?

126. Delete share group: Does this operation also need to write a tombstone
for the ShareGroupMemberMetadata record?

127. I was thinking about the impact on a new consumer joining the
shareGroup. This causes the GroupCoordinator and the ShareCoordinator to
bump up the group epoch, which in turn causes the SharePartition leader to
reinitialize the state with ReadShareGroupOffsetsState. If many consumers
are joining a shareGroup around the same time, would there be too much load
for the ShareCoordinator and SharePartition leader?

128. Since group.share.enable will be replaced by the group.version
feature, should we make it an internal config?

129. group.coordinator.rebalance.protocols: this seems redundant with
 group.share.enable?

130. Why do we have both group.share.record.lock.duration.ms and
group.share.record.lock.duration.max.ms, each with its own max value?

131. group.share.record.lock.partition.limit defaults to 200. This limits
the max degree of consumer parallelism to 200, right? If there are multiple
records per batch, it could be even smaller.

132. Why do we need all three of group.share.session.timeout.ms,
group.share.min.session.timeout.ms and group.share.max.session.timeout.ms?
Session timeout can't be set by the client. Ditto for
group.share.heartbeat.interval.ms.

133. group.share.max.size: Would group.share.max.members.per.group be a
more intuitive name?

134. group.share.assignors: Why does it need to be a list?

135. share.coordinator.threads: Is that per share coordinator or per broker?

Jun

On Tue, Apr 16, 2024 at 3:21 AM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for you reply.
>
> 42.1. That’s a sensible improvement. Done.
>
> 47,56. Done. All instances of BaseOffset changed to FirstOffset.
>
> 105. I think that would be in a future KIP. Personally, I don’t mind having
> a non-contiguous set of values in this KIP.
>
> 114. Done.
>
> 115. If the poll is just returning a single record because there is not
> much
> data to consume, committing on every record is OK. It’s inefficient but
> acceptable.
> If the first poll returns just one record, but many more have piled up
> while
> the first one was being processed, the next poll has the opportunity to
> return
> a bunch of records and then these will be able to be committed together.
> So, my answer is that optimisation on the broker to return batches of
> records when the records are available is the approach we will take here.
>
> 116. Good idea. Done.
>
> 117. I’ve rewritten the Kafka Broker Migration section. Let me know what
> you think.
> I am discussing the configuration to enable the feature in the mailing
> list with
> David Jacot also, so I anticipate a bit of change in this area still.
>
> Thanks,
> Andrew
>
> > On 15 Apr 2024, at 23:34, Jun Rao  wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the updated KIP.
> >
> > 42.1 "If the share group offset is altered multiple times when the group
> > remains empty, it would be harmless if the same state epoch was reused to
> > initialize the state."
> > Hmm, how does the partition leader know for sure that it has received the
> > latest share group offset if epoch is reused?
> > Could we update the section "Group epoch - Trigger a rebalance" that
> > AdminClient.alterShareGroupOffsets causes the group epoch to be bumped
> too?
> >
> > 47,56 "my view is that BaseOffset should become FirstOffset in ALL
> schemas
> > defined in the KIP."
> > Yes, that seems better to me.
> >
> > 105. "I have another non-terminal 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Rohan Desai
Thanks for the feedback Bruno! For the most part I think it makes sense,
but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we might want to
wrap the return type for `assign` in a class so that its easily extensible.
This makes sense to me. Whether we do that or not, we can have the return
type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I like
your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai  wrote:

> Thanks for the feedback Sophie!
>
> re1: Totally agree. The fact that it's related to the partition assignor
> is clear from just `task.assignor`. I'll update.
> re3: This is a good point, and something I would find useful personally. I
> think its worth adding an interface that lets the plugin observe the final
> assignment. I'll add that.
> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>
> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback so far! I think pretty much all of it is
>> reasonable. I'll reply to it inline:
>>
>> > 1. All the API logic is granular at the Task level, except the
>> previousOwnerForPartition func. I’m not clear what’s the motivation
>> behind it, does our controller also want to change how the
>> partitions->tasks mapping is formed?
>> You're right that this is out of place. I've removed this method as it's
>> not needed by the task assignor.
>>
>> > 2. Just on the API layering itself: it feels a bit weird to have the
>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>> the ApplicationMetadata class. If we consider them as some default util
>> functions, how about introducing moving those into their own static util
>> methods to separate from the ApplicationMetadata “fact objects” ?
>> Agreed. Updated in the latest revision of the kip. These have been moved
>> to TaskAssignorUtils
>>
>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>> containing the decisions made by the assignor, including the
>> requestFollowupRebalance flag. For manipulating the half-baked results
>> inside the assignor itself, maybe we can just be flexible to let users use
>> whatever struts / their own classes even, if they like. WDYT?
>> Agreed. Updated in the latest version of the kip.
>>
>> > 1. For the API, thoughts on changing the method signature to return a
>> (non-Optional) TaskAssignor? Then we can either have the default
>> implementation return new HighAvailabilityTaskAssignor or just have a
>> default implementation class that people can extend if they don't want to
>> implement every method.
>> Based on some other discussion, I actually decided to get rid of the
>> plugin interface, and instead use config to specify individual plugin
>> behaviour. So the method you're referring to is no longer part of the
>> proposal.
>>
>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
>> theres methods that return void on it? It's not totally clear to me how
>> that interface is supposed to be used by the assignor. It'd be nice if we
>> could flip that interface such that it becomes part of the output instead
>> of an input to the plugin.
>> I've moved those methods to a util class. They're really utility methods
>> the assignor might want to call to do some default or optimized assignment
>> for some cases like rack-awareness.
>>
>> > 4. We should consider wrapping UUID in a ProcessID class so that we
>> control
>> the interface (there are a few places where UUID is directly used).
>> I like it. Updated the proposal.
>>
>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
>> was
>> for the plugin to make the assignment? Is that the result of the default
>> logic?
>> It doesn't need to be part of the interface. I've removed it.
>>
>> > re 2/6:
>>
>> I generally agree with these points, but I'd rather hash that out in a PR
>> than in the KIP review, as it'll be clearer what gets used how. It seems to
>> me (committers please correct me if I'm wrong) that as long as we're on the
>> same page about what information the interfaces are returning, that's ok at
>> this level of discussion.
>>
>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
>> wrote:
>>
>>> Hello All,
>>>
>>> I'd like to start a discussion on KIP-924 (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
>>> which proposes an interface to allow users to plug into the streams
>>> partition assignor. The motivation section in the KIP goes into some more
>>> detail on why we think this is a useful addition. Thanks in advance for
>>> your feedback!
>>>
>>> Best Regards,
>>>
>>> Rohan
>>>
>>>


[jira] [Resolved] (KAFKA-15585) DescribeTopic API

2024-04-19 Thread Calvin Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Calvin Liu resolved KAFKA-15585.

Resolution: Fixed

> DescribeTopic API
> -
>
> Key: KAFKA-15585
> URL: https://issues.apache.org/jira/browse/KAFKA-15585
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
>
> Adding the new DescribeTopic API + the admin client and server-side handling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16589:
--

 Summary: Consider removing `ClusterInstance#createAdminClient` 
since callers are not sure whether they need to call close
 Key: KAFKA-16589
 URL: https://issues.apache.org/jira/browse/KAFKA-16589
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Sometimes we close the admin created by `createAdminClient`, and sometimes we 
don't. That is not a true problem since the `ClusterInstance` will call `close` 
when stopping.

However, that cause a lot of inconsistent code, and in fact it does not save 
much time since creating a Admin is not a hard work. We can get 
`bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.

 
{code:java}
// before
try (Admin admin = cluster.createAdminClient()) { }

// after v0
try (Admin admin = Admin.create(Collections.singletonMap(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers( {}

{code}
Personally, the `after` version is not verbose, but we can have alternatives: 
`Map clientConfigs`.

 
{code:java}
// after v1
try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-19 Thread Frédérik Rouleau
Hi everyone,

Thanks for all that valuable feedback.
So we have a consensus not to use Record.

I have updated to PR by creating 2 childs classes
KeyDeserializationException and ValueDeserializationException. Those
classes directly embed the required fields. I do not think a wrapper object
would be useful in that case.
I still had to update checkstyle as Headers class is not allowed for import
in the Errors package. I do not think it's an issue to add that
authorization as Headers is already used in consumerRecord, so already
public.

The proposed PR https://github.com/apache/kafka/pull/15691/files

If it's ok I will update the KIP.

Regards,
Fred


Re: [VOTE] KIP-1020 Move `window.size.ms` and `windowed.inner.serde.class` from `StreamsConfig` to TimeWindowedDe/Serializer class

2024-04-19 Thread Lucia Cerchie
Thanks all for voting.

I'm now closing the vote. The vote passes with
- 3 +1 bindings votes from Lucas Brutschy, Sophie Blee-Goldman, and
Matthias J Sax
- 0 +1 non-binding votes
- 0 -1 votes


Thanks,
Lucia Cerchie

On Mon, Apr 8, 2024 at 2:08 AM Lucas Brutschy
 wrote:

> +1 (binding)
>
> Thanks, Lucia!
>
> On Wed, Apr 3, 2024 at 11:35 PM Sophie Blee-Goldman
>  wrote:
> >
> > +1 (binding)
> >
> > Thanks Lucia!
> >
> > On Tue, Apr 2, 2024 at 12:23 AM Matthias J. Sax 
> wrote:
> >
> > > +1 (binding)
> > >
> > >
> > > -Matthias
> > >
> > > On 4/1/24 7:44 PM, Lucia Cerchie wrote:
> > > > Hello everyone,
> > > >
> > > > I'd like to call a vote on KIP-1020
> > > > <
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804
> > > >.
> > > > It has been under discussion since Feb 15, and has received edits to
> the
> > > > KIP and approval by discussion participants.
> > > >
> > > > Best,
> > > > Lucia Cerchie
> > > >
> > >
>


-- 

[image: Confluent] 
Lucia Cerchie
Developer Advocate
Follow us: [image: Blog]
[image:
Twitter] [image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]



[jira] [Created] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero

2024-04-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16588:
--

 Summary: broker shutdown hangs when `log.segment.delete.delay.ms` 
is zero 
 Key: KAFKA-16588
 URL: https://issues.apache.org/jira/browse/KAFKA-16588
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see 
https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154

We call `take` even though the `logsToBeDeleted` is empty, and 
`KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` 
([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]

Hence, the thread won't be completed forever, and it blocks the shutdown of 
broker.

We should replace the `take` by `poll` since we have checked the element before.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer

2024-04-19 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16579.

Resolution: Fixed

> Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
> -
>
> Key: KAFKA-16579
> URL: https://issues.apache.org/jira/browse/KAFKA-16579
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated 
> a slew of system tests to run both the "old" and "new" implementations. 
> KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} 
> so it could test the new consumer. However, the test is tailored specifically 
> to the "old" Consumer's protocol and assignment strategy upgrade.
> Unsurprisingly, when we run those system tests with the new 
> {{AsyncKafkaConsumer}}, we get errors like the following:
> {code}
> test_id:
> kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   29.634 seconds
> AssertionError("Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1)})}")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 77, in rolling_update_test
> self._verify_range_assignment(consumer)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py",
>  line 41, in _verify_range_assignment
> "Mismatched assignment: %s" % assignment
> AssertionError: Mismatched assignment: {frozenset(), 
> frozenset({TopicPartition(topic='test_topic', partition=0), 
> TopicPartition(topic='test_topic', partition=1)})}
> {code}
> The task here is to revert the changes made in KAFKA-16271.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-04-19 Thread Christo Lolov
Heya all!

I have updated KIP-950. A list of what I have updated is:

* Explicitly state that Zookeeper-backed clusters will have ENABLED ->
DISABLING -> DISABLED while KRaft-backed clusters will only have ENABLED ->
DISABLED
* Added two configurations for the new thread pools and explained where
values will be picked-up mid Kafka version upgrade
* Explained how leftover remote partitions will be scheduled for deletion
* Updated the API to use StopReplica V5 rather than a whole new
controller-to-broker API
* Explained that the disablement procedure will be triggered by the
controller listening for an (Incremental)AlterConfig change
* Explained that we will first move log start offset and then issue a
deletion
* Went into more details that changing remote.log.disable.policy after
disablement won't do anything and that if a customer would like additional
data deleted they would have to use already existing methods

Let me know if there are any new comments or I have missed something!

Best,
Christo

On Mon, 15 Apr 2024 at 12:40, Christo Lolov  wrote:

> Heya Doguscan,
>
> I believe that the state of the world after this KIP will be the following:
>
> For Zookeeper-backed clusters there will be 3 states: ENABLED, DISABLING
> and DISABLED. We want this because Zookeeper-backed clusters will await a
> confirmation from the brokers that they have indeed stopped tiered-related
> operations on the topic.
>
> For KRaft-backed clusters there will be only 2 states: ENABLED and
> DISABLED. KRaft takes a fire-and-forget approach for topic deletion. I
> believe the same approach ought to be taken for tiered topics. The
> mechanism which will ensure that leftover state in remote due to failures
> is cleaned up to me is the retention mechanism. In today's code, a leader
> deletes all segments it finds in remote with offsets below the log start
> offset. I believe this will be good enough for cleaning up leftover state
> in remote due to failures.
>
> I know that quite a few changes have been discussed so I will aim to put
> them on paper in the upcoming days and let everyone know!
>
> Best,
> Christo
>
> On Tue, 9 Apr 2024 at 14:49, Doğuşcan Namal 
> wrote:
>
>> +1 let's not introduce a new api and mark it immediately as deprecated :)
>>
>> On your second comment Luke, one thing we need to clarify is when do we
>> consider remote storage to be DISABLED for a topic?
>> Particularly, what is the state when the remote storage is being deleted
>> in case of disablement.policy=delete? Is it DISABLING or DISABLED?
>>
>> If we move directly to the DISABLED state,
>>
>> a) in case of failures, the leaders should continue remote storage
>> deletion even if the topic is moved to the DISABLED state, otherwise we
>> risk having stray data on remote storage.
>> b) on each restart, we should initiate the remote storage deletion
>> because although we replayed a record with a DISABLED state, we can not be
>> sure if the remote data is deleted or not.
>>
>> We could either consider keeping the remote topic in DISABLING state
>> until all of the remote storage data is deleted, or we need an additional
>> mechanism to handle the remote stray data.
>>
>> The existing topic deletion, for instance, handles stray logs on disk by
>> detecting them on KafkaBroker startup and deleting before the
>> ReplicaManager is started.
>> Maybe we need a similar mechanism here as well if we don't want a
>> DISABLING state. Otherwise, we need a callback from Brokers to validate
>> that remote storage data is deleted and now we could move to the DISABLED
>> state.
>>
>> Thanks.
>>
>> On Tue, 9 Apr 2024 at 12:45, Luke Chen  wrote:
>>
>>> Hi Christo,
>>>
>>> > I would then opt for moving information from DisableRemoteTopic
>>> within the StopReplicas API which will then disappear in KRaft world as
>>> it
>>> is already scheduled for deprecation. What do you think?
>>>
>>> Sounds good to me.
>>>
>>> Thanks.
>>> Luke
>>>
>>> On Tue, Apr 9, 2024 at 6:46 PM Christo Lolov 
>>> wrote:
>>>
>>> > Heya Luke!
>>> >
>>> > I thought a bit more about it and I reached the same conclusion as you
>>> for
>>> > 2 as a follow-up from 1. In other words, in KRaft world I don't think
>>> the
>>> > controller needs to wait for acknowledgements for the brokers. All we
>>> care
>>> > about is that the leader (who is responsible for archiving/deleting
>>> data in
>>> > tiered storage) knows about the change and applies it properly. If
>>> there is
>>> > a leadership change halfway through the operation then the new leader
>>> still
>>> > needs to apply the message from the state topic and we know that a
>>> > disable-message will be applied before a reenablement-message. I will
>>> > change the KIP later today/tomorrow morning to reflect this reasoning.
>>> >
>>> > However, with this I believe that introducing a new API just for
>>> > Zookeeper-based clusters (i.e. DisableRemoteTopic) becomes a bit of an
>>> > overkill. I would then opt for moving information from
>>> 

[VOTE] KIP-1037: Allow WriteTxnMarkers API with Alter Cluster Permission

2024-04-19 Thread Nikhil Ramakrishnan
Hi everyone,

I would like to start a voting thread for KIP-1037: Allow
WriteTxnMarkers API with Alter Cluster Permission
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1037%3A+Allow+WriteTxnMarkers+API+with+Alter+Cluster+Permission)
as there have been no objections on the discussion thread.

For comments or feedback please check the discussion thread here:
https://lists.apache.org/thread/bbkyt8mrc8xp3jfyvhph7oqtjxl29xmn

Thanks,
Nikhil


Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-19 Thread Lucas Brutschy
Congrats!

On Thu, Apr 18, 2024 at 6:50 PM Justine Olshan
 wrote:
>
> Congratulations Greg!
>
> On Thu, Apr 18, 2024 at 12:03 AM Matthias J. Sax  wrote:
>
> > Congrats Greg!
> >
> > On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:
> > > Congrats! Well deserved
> > >
> > > From: dev@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:
> > dev@kafka.apache.org
> > > Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris
> > >
> > > Hi all,
> > >
> > > Greg Harris has been a Kafka committer since July 2023. He has remained
> > > very active and instructive in the community since becoming a committer.
> > > It's my pleasure to announce that Greg is now a member of Kafka PMC.
> > >
> > > Congratulations, Greg!
> > >
> > > Chris, on behalf of the Apache Kafka PMC
> > >
> > >
> >


Re: [DISCUSS] KIP-1039: Disable automatic topic creation for MirrorMaker2 consumers

2024-04-19 Thread aaron ai
Hi Omnia,
Thanks for your feedback!

Yes, another approach could be to disable this option manually. IMHO, it's
hard to envision a scenario where enabling it would be necessary. BTW, I've
already added this part into the KIP.

On Fri, Apr 19, 2024 at 6:18 PM Omnia Ibrahim 
wrote:

> Hi Aaron,
> You mentioned that there is no public interface changes however changing
> the default value of a config should be considered as a public change. You
> can check other KIP where we changed the default config value for a
> reference.
>
> Can you please list what is the impact of changing the behaviour of the
> topic creation along side  as well as is there any rejected alternatives
> like can’t customer disable allow.auto.create.topics manually for example
> as a workaround?
>
> Thanks
> Omnia
>
> > On 19 Apr 2024, at 10:37, aaron ai  wrote:
> >
> > Hi all,
> >
> > Here is the KIP-1039: Disable automatic topic creation for MirrorMaker2
> > consumers
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1039%3A+Disable+automatic+topic+creation+for+MirrorMaker2+consumers
> >>
> >
> > Looking forward to your feedback!
> >
> > Best regards,
> > Aaron
>
>


Re: [DISCUSS] KIP-1039: Disable automatic topic creation for MirrorMaker2 consumers

2024-04-19 Thread Omnia Ibrahim
Hi Aaron,
You mentioned that there is no public interface changes however changing the 
default value of a config should be considered as a public change. You can 
check other KIP where we changed the default config value for a reference.

Can you please list what is the impact of changing the behaviour of the topic 
creation along side  as well as is there any rejected alternatives like can’t 
customer disable allow.auto.create.topics manually for example as a workaround? 

Thanks
Omnia

> On 19 Apr 2024, at 10:37, aaron ai  wrote:
> 
> Hi all,
> 
> Here is the KIP-1039: Disable automatic topic creation for MirrorMaker2
> consumers
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1039%3A+Disable+automatic+topic+creation+for+MirrorMaker2+consumers
>> 
> 
> Looking forward to your feedback!
> 
> Best regards,
> Aaron



[DISCUSS] KIP-1039: Disable automatic topic creation for MirrorMaker2 consumers

2024-04-19 Thread aaron ai
Hi all,

Here is the KIP-1039: Disable automatic topic creation for MirrorMaker2
consumers
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1039%3A+Disable+automatic+topic+creation+for+MirrorMaker2+consumers
>

Looking forward to your feedback!

Best regards,
Aaron


Re: [DISCUSS] KIP-1028: Docker Official Image for Apache Kafka

2024-04-19 Thread Manikumar
Thanks Krish. KIP looks good to me.

On Wed, Apr 17, 2024 at 1:38 PM Krish Vora  wrote:
>
> Hi Manikumar,
>
> Thanks for the comments.
>
> Maybe as part of the release process, RM can create a JIRA for this
> > task. This can be taken by RM or any comitter or any contributor (with
> > some help from commiters to run "Docker Image Preparation via GitHub
> > Actions:"
>
> This sounds like a good idea. This step would be beneficial. By creating a
> JIRA ticket, it will also serve as a reminder to complete the post-release
> steps for the Docker official images. Have updated the KIP with this step.
>
> Is this using GitHub Actions workflow? or manual testing?
>
> This will be done by a Github Actions workflow, which will test the static
> Docker Official Image assets for a specific release version.
>
> Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> > official images repository (or) can it be done by any contributor.
>
> I believe that it can be done by any contributor (ref: This link
> 
> quotes "*Anyone can provide feedback, contribute code, suggest process
> changes, or even propose a new Official Image.*")
>
> Also I was thinking, once the KIP gets voted, we should try to release
> > kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> > validate the process and allow us to fix any changes suggested by
> > Dockerhub before the 3.8.0 release.
>
> This sounds like a great idea. This KIP proposes release of DOI as a
> post-release process, which can be done anytime post release. Since 3.7.0
> is already released, we can perform these steps for that release too. By
> the time the KIP gets implemented, if 3.7.1 is released, we could do these
> steps for 3.7.1, instead of 3.7.0. This would allow us to make changes to
> the Dockerfiles and other assets based on feedback from Docker Hub before
> the release of version 3.8.0.
>
> Thanks,
> Krish.
>
> On Mon, Apr 15, 2024 at 12:59 PM Manikumar 
> wrote:
>
> > Hi Krish,
> >
> > Thanks for the updated KIP. a few comments below.
> >
> > > "These actions can be carried out by the RM or any contributor post the
> > release process."
> > Maybe as part of the release process, RM can create a JIRA for this
> > task. This can be taken by RM or any comitter or any contributor (with
> > some help from commiters to run "Docker Image Preparation via GitHub
> > Actions:"
> >
> > > "Perform Docker build tests to ensure image integrity"
> > Is this using GitHub Actions workflow? or manual testing?
> >
> > > "The RM will manually raise the final PR to Docker Hub’s official images
> > repository using the contents of the generated file"
> >  Is it mandatory for RM/comitters to raise the PR to Docker Hub’s
> > official images repository (or) can it be done by any contributor.
> >
> > Also I was thinking, once the KIP gets voted, we should try to release
> > kafka:3.7.0 (or 3.7.1) Docker Official image. This will help us to
> > validate the process and allow us to fix any changes suggested by
> > Dockerhub before the 3.8.0 release.
> >
> >
> > Thanks,
> >
> > On Mon, Apr 8, 2024 at 2:33 PM Krish Vora  wrote:
> > >
> > > Hi Manikumar and Luke.
> > > Thanks for the questions.
> > >
> > > 1. No, the Docker inventory files and configurations will not be the same
> > > for Open Source Software (OSS) Images and Docker Official Images (DOI).
> > >
> > > For OSS images, the Dockerfile located in docker/jvm/dockerfile is
> > > utilized. This process is integrated with the existing release pipeline
> > as
> > > outlined in KIP-975
> > > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-Status
> > >,
> > > where the Kafka URL is provided as a build argument. This method allows
> > for
> > > building, testing, and releasing OSS images dynamically. The OSS images
> > > will continue to be released under the standard release process .
> > >
> > > In contrast, the release process for DOIs requires providing the Docker
> > Hub
> > > team with a specific directory for each version release that contains a
> > > standalone Dockerfile. These Dockerfiles are designed to be
> > > self-sufficient, hence require hardcoded values instead of relying on
> > build
> > > arguments. To accommodate this, in our proposed approach, a new directory
> > > named docker_official_images has been created. This directory contains
> > > version-specific directories, having Dockerfiles with hardcoded
> > > configurations for each release, acting as the source of truth for DOI
> > > releases. The hardcoded dockerfiles will be created using the
> > > docker/jvm/dockerfile as a template. Thus, as part of post release we
> > will
> > > be creating a Dockerfile that will be reviewed by the Dockerhub community
> > > and might need changes as per their review. This approach ensures that
> > DOIs
> > > are built consistently and meet the specific 

[jira] [Created] (KAFKA-16587) Store subscription model for consumer group in group state

2024-04-19 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16587:


 Summary: Store subscription model for consumer group in group state
 Key: KAFKA-16587
 URL: https://issues.apache.org/jira/browse/KAFKA-16587
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy


Currently we iterate through all the subscribed topics for each member in the 
consumer group to determine whether all the members are subscribed to the same 
set of topics aka it has a homogeneous subscription model.

Instead of iterating and comparing the topicIds on every rebalance, we want to 
maintain this information in the group state



--
This message was sent by Atlassian Jira
(v8.20.10#820010)