Re: Apache Kafka 3.6.0 release

2023-07-21 Thread Satish Duggana
Thanks Hao for the update on KIP-925.

On Thu, 20 Jul 2023 at 23:05, Hao Li  wrote:
>
> Hi Satish,
>
> KIP-925 was accepted and currently under implementation. I just added it to
> the release plan.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams
>
> Thanks,
> Hao
>
> On Thu, Jul 20, 2023 at 6:18 AM Christo Lolov 
> wrote:
>
> > Hello!
> >
> > A couple of days ago I opened a new KIP for discussion - KIP-952 [1]. I
> > believe it might be a blocker for the release of 3.6.0, but I wanted to
> > bring it up here for a decision on its urgency with the current set of
> > people who are looking at Tiered Storage (Satish, Luke, Ivan, Divij) given
> > that the date for KIP freeze is fast approaching.
> > What are your thoughts on the matter?
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-952%3A+Regenerate+segment-aligned+producer+snapshots+when+upgrading+to+a+Kafka+version+supporting+Tiered+Storage
> >
> > Best,
> > Christo
> >
> > On Sat, 8 Jul 2023 at 13:06, Satish Duggana 
> > wrote:
> >
> > > Hi Yash,
> > > Thanks for the update. Added KIP-793 to the release plan. Please feel
> > > free to update the release wiki with any other updates on the KIP.
> > >
> > > ~Satish.
> > >
> > > On Fri, 7 Jul 2023 at 10:52, Yash Mayya  wrote:
> > > >
> > > > Hi Satish,
> > > >
> > > > KIP-793 [1] just passed voting and we should be able to wrap up the
> > > > implementation in time for the 3.6.0 feature freeze. Could we add it to
> > > the
> > > > release plan?
> > > >
> > > > [1] -
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
> > > >
> > > > Thanks,
> > > > Yash
> > > >
> > > > On Mon, Jun 12, 2023 at 3:52 PM Satish Duggana <
> > satish.dugg...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I have created a release plan for Apache Kafka version 3.6.0 on the
> > > > > wiki. You can access the release plan and all related information by
> > > > > following this link:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.6.0
> > > > >
> > > > > The release plan outlines the key milestones and important dates for
> > > > > version 3.6.0. Currently, the following dates have been set for the
> > > > > release:
> > > > >
> > > > > KIP Freeze: 26th July 23
> > > > > Feature Freeze : 16th Aug 23
> > > > > Code Freeze : 30th Aug 23
> > > > >
> > > > > Please review the release plan and provide any additional information
> > > > > or updates regarding KIPs targeting version 3.6.0. If you have
> > > > > authored any KIPs that are missing a status or if there are incorrect
> > > > > status details, please make the necessary updates and inform me so
> > > > > that I can keep the plan accurate and up to date.
> > > > >
> > > > > Thanks,
> > > > > Satish.
> > > > >
> > > > > On Mon, 17 Apr 2023 at 21:17, Luke Chen  wrote:
> > > > > >
> > > > > > Thanks for volunteering!
> > > > > >
> > > > > > +1
> > > > > >
> > > > > > Luke
> > > > > >
> > > > > > On Mon, Apr 17, 2023 at 2:03 AM Ismael Juma 
> > > wrote:
> > > > > >
> > > > > > > Thanks for volunteering Satish. +1.
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Sun, Apr 16, 2023 at 10:08 AM Satish Duggana <
> > > > > satish.dugg...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > I would like to volunteer as release manager for the next
> > > release,
> > > > > > > > which will be Apache Kafka 3.6.0.
> > > > > > > >
> > > > > > > > If there are no objections, I will start a release plan a week
> > > after
> > > > > > > > 3.5.0 release(around early May).
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Satish.
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >


Re: [DISCUSS] KIP-953: partition method to be overloaded to accept headers as well.

2023-07-21 Thread Jack Tomy
Hey @Sagar,

Thank you again for the response and feedback.

   1. Though the ask wasn't very clear to me I have attached the Javadoc as
   per your suggestion. Please have a look and let me know if this meets the
   expectations.
   2. Done.
   3. Done
   4. Done

Hey @Sagar and everyone,
Please have a look at the new version and share your thoughts.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937

On Thu, Jul 20, 2023 at 9:46 PM Sagar  wrote:

> Thanks Jack for the updates.
>
> Some more feedback:
>
> 1) It would be better if you can add the Javadoc in the Public interfaces
> section. That is a general practice used which gives the readers of the KIP
> a high level idea of the Public Interfaces.
>
> 2) In the proposed section, the bit about marking headers as read only
> seems like an implementation detail This can generally be avoided in KIPs.
>
> 3) Also, in the Deprecation section, can you mention again that this is a
> backward compatible change and the reason for it (already done in the
> Proposed Changes section).
>
> 4) In the Testing Plan section, there is still the KIP template bit copied
> over. That can be removed.
>
> Thanks!
> Sagar.
>
>
> On Thu, Jul 20, 2023 at 2:48 PM Jack Tomy  wrote:
>
> > Hey Everyone,
> >
> > Please consider this as a reminder and share your feedback. Thank you.
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> >
> > On Tue, Jul 18, 2023 at 5:43 PM Jack Tomy  wrote:
> >
> > > Hey @Sagar,
> > >
> > > Thank you for the response and feedback.
> > >
> > >1. Done
> > >2. Yeah, that was a mistake from my end. Corrected.
> > >3. Can you please elaborate this, I have added the java doc along
> with
> > >the code changes. Should I paste the same in KIP too?
> > >4. Moved.
> > >5. I have added one more use case, it is actually helpful in any
> > >situation where you want to pass some information to partition
> method
> > but
> > >don't have to have it in the key or value.
> > >6. Added.
> > >
> > >
> > > Hey @Sagar and everyone,
> > > Please have a look at the new version and share your thoughts.
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > >
> > >
> > > On Tue, Jul 18, 2023 at 9:53 AM Sagar 
> wrote:
> > >
> > >> Hi Jack,
> > >>
> > >> Thanks for the KIP! Seems like an interesting idea. I have some
> > feedback:
> > >>
> > >> 1) It would be great if you could clean up the text that seems to
> mimic
> > >> the
> > >> KIP template. It is generally not required in the KIP.
> > >>
> > >> 2) In the Public Interfaces where you mentioned *Partitioner method in
> > >> **org/apache/kafka/clients/producer
> > >> will have the following update*, I believe you meant the Partitioner
> > >> *interface*?
> > >>
> > >> 3) Staying on Public Interface, it is generally preferable to add a
> > >> Javadocs section along with the newly added method. You could also
> > >> describe
> > >> the behaviour of it invoking the default existing method.
> > >>
> > >> 4) The option that is mentioned in the Rejected Alternatives, seems
> more
> > >> like a workaround to the current problem that you are describing. That
> > >> could be added to the Motivation section IMO.
> > >>
> > >> 5) Can you also add some more examples of scenarios where this would
> be
> > >> helpful? The only scenario mentioned seems to have a workaround. Just
> > >> trying to ensure that we have a strong enough motivation before
> adding a
> > >> public API.
> > >>
> > >> 6) One thing which should also be worth noting down would be what
> > happens
> > >> if users override both methods, only one method (new or old) and no
> > >> methods
> > >> (the default behaviour). It would help in understanding the proposal
> > >> better.
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> > >>
> > >> On Mon, Jul 17, 2023 at 9:19 PM Jack Tomy 
> > wrote:
> > >>
> > >> > Hey everyone,
> > >> >
> > >> > Not seeing much discussion on the KPI. Might be because it is too
> > >> > obvious .
> > >> >
> > >> > If there are no more comments, I will start the VOTE in the coming
> > days.
> > >> >
> > >> > On Sat, Jul 15, 2023 at 8:48 PM Jack Tomy 
> > >> wrote:
> > >> >
> > >> > > Hey everyone,
> > >> > >
> > >> > > Please take a look at the KPI below and provide your suggestions
> and
> > >> > > feedback. TIA.
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263424937
> > >> > >
> > >> > >
> > >> > > --
> > >> > > Best Regards
> > >> > > *Jack*
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > Best Regards
> > >> > *Jack*
> > >> >
> > >>
> > >
> > >
> > > --
> > > Best Regards
> > > *Jack*
> > >
> >
> >
> > --
> > Best Regards
> > *Jack*
> >
>


-- 
Best Regards
*Jack*


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2027

2023-07-21 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax
I agree that it could easily be misused. There is a few Jira tickets for 
cases when people want to "cancel" a repartition step. I would hope 
those tickets are linked to the KIP (if not, we should do this, and 
maybe even c those cases as motivation into the KIP itself)?


It's always a tricky question to what extend we want to guide users, and 
to what extend we need to give levers for advances case (and how to 
design those levers...) It's for sure a good idea to call out "use with 
case" in the JavaDocs for the new method.



-Matthias

On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:

I guess I felt a bit uneasy about how this could be used/abused while
reading the KIP, but if we truly believe this is an advanced feature, I'm
fine with the way things currently are. It doesn't feel like the best API,
but it does seem to be the best *possible* API given the way things are.

W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I will
actually wait on before feeling comfortable enough to vote on this would be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax  wrote:


Some thought about the API question.



A. kstream.groupBy(...).aggregate(...)


This can be re-writtten as

kstream.selectKey(...)
 .markAsRepartitioned()
 .groupByKey()
 .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would
be ok?



B. ktable.groupBy(...).aggregate(...)


For KTable aggregation, not sure how useful it would be? In the end, an
table aggregation does only make sense if we pick something from the
value, ie, we indeed change the key?



C. kstream.selectKey(...).join(ktable)


We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
repartitioning of the left input KStream.



KStream.selectKey(...).toTable().join(...)


Not sure if I understand what you try to say with this example? In the
end, `selectKey(...).toTable()` would repartiton. If I know that one can
upsert directly, one inserts a `markAsRepartitioned()` in between.


In general, the use case seems to be that the key is not in the right
"format", or there is no key, but data was partitioned by a
value-attribute upstream and we just want to extract this
value-attribute into the key. Both seems to be KStream cases?


-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:

Hey Shay, while I don't have any specific concerns about the new public

API

in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator

clearly

in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP

and

then understand how this feature will work and how they would need to

apply

it.

To that end, I recommend framing this proposal with a few examples to

help

clarify the semantics. When and where can you apply the

markAsPartitioned()

operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition

before

an aggregation." We should change "aggregation" to "stateful operation"

as

this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says

this

should not be a concern "if we use markAsPartitioned correctly". Does

this

mean if we, the devs implementing this, write the feature correctly? Or

is

it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL


(or this KIP
<

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808

)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help.

Perhaps

we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to consider: either

a

KStream or KTable followed by a groupBy/aggregation, or a KStream with
key-changing 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2026

2023-07-21 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 393019 lines...]
Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [1] true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreNullRecord() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreNullRecord() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [2] false 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[1] true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [2] false 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
StoreQueryIntegrationTest > shouldQuerySpecificActivePartitionStores() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 180 > 
StoreQueryIntegrationTest > 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Sophie Blee-Goldman
I guess I felt a bit uneasy about how this could be used/abused while
reading the KIP, but if we truly believe this is an advanced feature, I'm
fine with the way things currently are. It doesn't feel like the best API,
but it does seem to be the best *possible* API given the way things are.

W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I will
actually wait on before feeling comfortable enough to vote on this would be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax  wrote:

> Some thought about the API question.
>
>
> >> A. kstream.groupBy(...).aggregate(...)
>
> This can be re-writtten as
>
> kstream.selectKey(...)
> .markAsRepartitioned()
> .groupByKey()
> .aggregate()
>
> Given that `markAsRepartitoned` is an advanced feature, I think it would
> be ok?
>
>
> >> B. ktable.groupBy(...).aggregate(...)
>
> For KTable aggregation, not sure how useful it would be? In the end, an
> table aggregation does only make sense if we pick something from the
> value, ie, we indeed change the key?
>
>
> >> C. kstream.selectKey(...).join(ktable)
>
> We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
> repartitioning of the left input KStream.
>
>
> > KStream.selectKey(...).toTable().join(...)
>
> Not sure if I understand what you try to say with this example? In the
> end, `selectKey(...).toTable()` would repartiton. If I know that one can
> upsert directly, one inserts a `markAsRepartitioned()` in between.
>
>
> In general, the use case seems to be that the key is not in the right
> "format", or there is no key, but data was partitioned by a
> value-attribute upstream and we just want to extract this
> value-attribute into the key. Both seems to be KStream cases?
>
>
> -Matthias
>
>
>
> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
> > Hey Shay, while I don't have any specific concerns about the new public
> API
> > in this KIP, I'd like to better understand how this feature will work
> > before I vote. We should document the behavior of this new operator
> clearly
> > in the KIP as well -- you don't necessarily need to write the complete
> > javadocs up front, but it should be possible for a user to read the KIP
> and
> > then understand how this feature will work and how they would need to
> apply
> > it.
> >
> > To that end, I recommend framing this proposal with a few examples to
> help
> > clarify the semantics. When and where can you apply the
> markAsPartitioned()
> > operator? Some suggestions below.
> >
> > Specific notes:
> >
> > 1. The KIP opens with "Each key changing operation in Kafka Streams
> > (selectKey, map, transform, etc.) now leads to automatic repartition
> before
> > an aggregation." We should change "aggregation" to "stateful operation"
> as
> > this is true for things like joins as well as aggregations
> > 2. The callout on IQ makes me a bit uncomfortable -- basically it says
> this
> > should not be a concern "if we use markAsPartitioned correctly". Does
> this
> > mean if we, the devs implementing this, write the feature correctly? Or
> is
> > it saying that this won't be a problem as long as "we", the users of this
> > feature, use it correctly"? Just wondering if you've put any thought into
> > how this would work yet (I personally have not)
> > 3. The KIP should lay out the proposed API exactly, even if there's only
> > one new method. Check out this KIP
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > (or this KIP
> > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808
> >)
> > for a good reference on what the Public Interfaces section should include
> > 4. Regarding the proposed API itself, I wonder if KStream is really the
> > most appropriate interface for the new operator. A repartition can be
> > triggered on just a KTable. Here's where some examples would help.
> Perhaps
> > we could focus on these three cases:
> >
> > A. kstream.groupBy(...).aggregate(...)
> > B. ktable.groupBy(...).aggregate(...)
> > C. kstream.selectKey(...).join(ktable)
> >
> > I'm sure someone will correct me if I'm missing any additional vital
> > examples, but at the very least, these are the three to consider: either
> a
> > KStream or KTable followed by a groupBy/aggregation, or a KStream with
> > key-changing operator followed by a join. Note that you could have
> > something like KStream.selectKey(...).toTable().join(...) as well, but
> > since there are no pure key-changing operators (like #selectKey) on
> > KTables, only groupBy() which must always be followed by aggregation,
> this
> > 4th case can be reduced to an example like C of a KStream with
> 

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Sophie Blee-Goldman
I agree with everything Almog said above, and will just add on to two
points:

1. Regarding whether to block this KIP on the completion of any or all
future implementations of in-memory version stores (or persist suppression
buffers), I feel that would be unfair to this feature which is completely
unrelated to the semantic improvements offered by versioned state stores.
It seems like the responsibility of those driving the versioned state
stores feature, not Almog/this KIP, to make sure that those bases are
covered. Further, if anything, this KIP will help with the massive
proliferation of StoreSuppliers on the Stores factory class, and provide
users with a much easier way to leverage the versioned stores without
having to muck around directly with the StoreSuppliers.

I also thought about it a bit, and really like Almog's suggestion to
introduce an additional StoreSpec for the Versioned state stores. Obviously
we can add the RocksDB one to this KIP now, and then as he mentioned,
there's an easy way to get users onto the IMVersionedStateStore types once
they are completed.

Lastly, on this note, I want to point out how smoothly this solved the
issue of timestamped stores, which are intended to be the DSL default but
are only a special case for RocksDB. Right now it can be confusing for a
user scrolling through the endless Stores class and seeing a timestamped
version of the persistent but not in-memory stores. One could easily assume
there was no timestamped option for IM stores and that this feature was
incomplete, if they weren't acutely aware of the internal implementation
details (namely that it's only required for RocksDB for compatibility
reasons). However, with this KIP, all that is handled completely
transparently to the user, and we the devs, who *are* aware of the internal
implementation details, are now appropriately the ones responsible for
handing the correct store type to a particular operator. While versioned
state stores may not be completely comparable, depending on whether we want
users to remain able to easily choose between using them or not (vs
timestamped which should be used by all), I still feel this KIP is a great
step in the right direction that not only should not be blocked on the
completion of the IM implementations, but in fact should specifically be
done first as it enables an easier way to utilize those IM versioned
stores. Just my 2 cents :)

2. The idea to expand the existing the config with a CUSTOM enum without
introducing another config to specify the CUSTOM store spec does not seem
appropriate, or  even possible (for the reasons Almog mentioned above about
config types, though perhaps there is a way I'm not seeing). I do not buy
the argument that we should optimize the API to make it easy for users who
just want to switch to all in-memory stores, as I truly believe this is a
very small fraction of the potential userbase of this feature (anyone who's
actually using this should please chime in!). It seems very likely that the
majority of users of this feature are actually those with custom state
stores, as to my knowledge, this has been the case any/every time this
feature was requested by users.

That said, while I don't see any way to get around introducing a new
config, I don't personally have a preference w.r.t whether to keep around
the old config or deprecate it. I simply don't get the impression it is
very heavily used as it stands today, while it only works for those who
want all in-memory stores. Again, input from actual users would be very
valuable here. In the absence of that data, I will just point to the fact
that this KIP was proposed by a Streams dev (you :P), abandoned, picked up
by another Streams dev, and finally implemented without ever hearing from a
user that they would find this useful. This is not to disparage the
original KIP, but just to say again, as I stated back then, it seemed like
a major opportunity loss to leave out custom state stores

Cheers,
Sophie

On Fri, Jul 21, 2023 at 1:52 PM Almog Gavra  wrote:

> Thanks for all the feedback folk! Responses inline.
>
> > Basically, I'm suggesting two things: first, call out in some way
> (perhaps the StoreTypeSpec javadocs) that each StoreTypeSpec is considered
> a public contract in itself and should outline any semantic guarantees it
> does, or does not, make. Second, we should add a note on ordering
> guarantees in the two OOTB specs: for RocksDB we assert that range queries
> will honor serialized byte ordering, whereas the InMemory flavor gives no
> ordering guarantee whatsoever at this time.
>
> That makes sense to me Sophie! I'll make the changes to the KIP. And @Colt,
> yes I believe that would be the new javadoc for the generic
> ReadOnlyKeyValueStore.
>
> > However, I am wondering if we should close others gaps first?
>
> @Matthias, thanks for the review and thoughts! I think we should separate
> closing other gaps in the product from providing this as useful
> functionality to avoid 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax

Some thought about the API question.



A. kstream.groupBy(...).aggregate(...)


This can be re-writtten as

kstream.selectKey(...)
   .markAsRepartitioned()
   .groupByKey()
   .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would 
be ok?




B. ktable.groupBy(...).aggregate(...)


For KTable aggregation, not sure how useful it would be? In the end, an 
table aggregation does only make sense if we pick something from the 
value, ie, we indeed change the key?




C. kstream.selectKey(...).join(ktable)


We can just insert a `markAsRepartitioned()` after `selectKey` to avoid 
repartitioning of the left input KStream.




KStream.selectKey(...).toTable().join(...)


Not sure if I understand what you try to say with this example? In the 
end, `selectKey(...).toTable()` would repartiton. If I know that one can 
upsert directly, one inserts a `markAsRepartitioned()` in between.



In general, the use case seems to be that the key is not in the right 
"format", or there is no key, but data was partitioned by a 
value-attribute upstream and we just want to extract this 
value-attribute into the key. Both seems to be KStream cases?



-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:

Hey Shay, while I don't have any specific concerns about the new public API
in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator clearly
in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP and
then understand how this feature will work and how they would need to apply
it.

To that end, I recommend framing this proposal with a few examples to help
clarify the semantics. When and where can you apply the markAsPartitioned()
operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition before
an aggregation." We should change "aggregation" to "stateful operation" as
this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says this
should not be a concern "if we use markAsPartitioned correctly". Does this
mean if we, the devs implementing this, write the feature correctly? Or is
it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP

(or this KIP
)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help. Perhaps
we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to consider: either a
KStream or KTable followed by a groupBy/aggregation, or a KStream with
key-changing operator followed by a join. Note that you could have
something like KStream.selectKey(...).toTable().join(...) as well, but
since there are no pure key-changing operators (like #selectKey) on
KTables, only groupBy() which must always be followed by aggregation, this
4th case can be reduced to an example like C of a KStream with key-changing
operation and downstream join -- ie there's no way to do this without
#toTable which is more like syntactic sugar for the purposes of this
repartitioning discussion.

I worry that making this a DSL operator on KStream is too generic, and we
would also need to add it to KTable for example B, despite KTables not
having any true pure key-changing operators outside of #groupBy. Would we
throw an exception if you invoked #markAsPartitioned on a KTable that
wasn't followed by a groupBy? If you have multiple key-changing operators,
would you need to add markAsPartitioned after each one? If not, what are
the semantics of that?  These are the main questions that got me thinking
here, and will definitely need to be clarified in the KIP if we do go with
the current proposal. But I wanted to throw out another idea for an API I
think would help with some of this awkwardness by having clearly defined
semantics:

Fundamentally it seems to me that these issues are arising from that "being
partitioned" is conceptually a property of other operations applied to a

[jira] [Resolved] (KAFKA-14950) Implement assign() and assignment()

2023-07-21 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14950.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

merged the PR to trunk.

> Implement assign() and assignment()
> ---
>
> Key: KAFKA-14950
> URL: https://issues.apache.org/jira/browse/KAFKA-14950
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-945
> Fix For: 3.6.0
>
>
> Implement assign() and assignment()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Almog Gavra
Thanks for all the feedback folk! Responses inline.

> Basically, I'm suggesting two things: first, call out in some way
(perhaps the StoreTypeSpec javadocs) that each StoreTypeSpec is considered
a public contract in itself and should outline any semantic guarantees it
does, or does not, make. Second, we should add a note on ordering
guarantees in the two OOTB specs: for RocksDB we assert that range queries
will honor serialized byte ordering, whereas the InMemory flavor gives no
ordering guarantee whatsoever at this time.

That makes sense to me Sophie! I'll make the changes to the KIP. And @Colt,
yes I believe that would be the new javadoc for the generic
ReadOnlyKeyValueStore.

> However, I am wondering if we should close others gaps first?

@Matthias, thanks for the review and thoughts! I think we should separate
closing other gaps in the product from providing this as useful
functionality to avoid feature creep so long as the API proposed here will
be suitable for when we want to close those implementation gaps! My general
proposal is that for things that are not customizable today by
default.dsl.store they remain not customizable after this KIP. The good
news is, however, that there's no reason why this cannot be extended to
cover those in the future if we want to - see specifics below.

Comments on the specifics below

> In particular, this holds for the new versioned-store ... Should
versioned stores also be covered by the KIP

Is there a reason why we can't introduce a VersionedRocksDBStoreTypeSpec
and if we ever support an in-memory an equivalent
VersionedInMemoryRocksDBStoreTypeSpec? If so, then there would not need to
be any additional changes to the API proposed in this KIP.

> For `suppress()` it's actually other way around we only have an in-memory
implementation. Do you aim to allow custom stores for `suppress()`, too?

We have three options here:
1) we can decide to maintain existing behavior and use the in-memory
implementation for all stores (not even going through the API at all)
2a) we can introduce a new parameter to the KeyValueParams class (boolean
isTimeOrderedBuffer or something like that) and return an in-memory store
in the implementation of RocksDBStoreTypeSpec (this maintains the existing
behavior, and would allow us in the future to make the change to return a
RocksDB store if we ever provide one)
2b) same as 2a but we throw an exception if the requested store type does
not support that (this is backwards incompatible, and since ROCKS_DB is the
default we probably shouldn't do this)

My proposal for now is 1) because as of KIP-825
EmitStrategy#ON_WINDOW_CLOSE is the preferred way of suppressing and that
is accounted for in this API already.

> Last, I am not sure if the new parameter replacing the existing one is the
best way to go?

I'm happy either way, just let me know which you prefer - the discussion
around CUSTOM is in the rejected alternatives but I'm happy to differ to
whatever the project conventions are :)

> If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as we
do know, and if know we assume it's a fully qualified class name and try to
instantiate it?

Note that there is no functionality for this kind of thing in
AbstractConfig (it's either a String validated enum or a class) so this
would be a departure from convention. Again, I'm happy to implement that if
it's preferred.

> Also wondering how it would related to the existing `Stores` factory?

StoreTypeSpec will depend on Stores factory - they're one layer removed.
You can imagine that StoreTypeSpec is just a grouping of methods from the
Stores factory into a convenient package for default configuration purposes.

Thanks again for all the detailed thoughts Matthias!

On Fri, Jul 21, 2023 at 11:50 AM Matthias J. Sax  wrote:

> Thanks for the KIP. Overall I like the idea to close this gap.
>
> However, I am wondering if we should close others gaps first? In
> particular, IIRC, we have a few cases for which we only have a RocksDB
> implementation for a store, and thus, adding an in-memory version for
> these stores first, to make the current `IN_MEMORY` parameter work,
> might be the first step?
>
> In particular, this holds for the new versioned-store (but I actually
> believe the is some other internal store with no in-memory
> implementation). -- For `suppress()` it's actually other way around we
> we only have an in-memory implementation. Do you aim to allow custom
> stores for `suppress()`, too?
>
> Btw: Should versioned stores also be covered by the KIP (ie,
> `StoreTypeSpec`)? We did consider to add a new option `VERSIONED` to the
> existing `default.dsl.store` config, but opted out for various reasons.
>
> Last, I am not sure if the new parameter replacing the existing one is
> the best way to go? Did you put the idea to add `CUSTOM` to the existing
> config into rejected alternative. Personally, I would prefer to add
> `CUSTOM` as I would like to optimize to easy of use for the 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-21 Thread Nick Telford
One more thing: I noted John's suggestion in the KIP, under "Rejected
Alternatives". I still think it's an idea worth pursuing, but I believe
that it's out of the scope of this KIP, because it solves a different set
of problems to this KIP, and the scope of this one has already grown quite
large!

On Fri, 21 Jul 2023 at 21:33, Nick Telford  wrote:

> Hi everyone,
>
> I've updated the KIP (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
> with the latest changes; mostly bringing back "Atomic Checkpointing" (for
> what feels like the 10th time!). I think the one thing missing is some
> changes to metrics (notably the store "flush" metrics will need to be
> renamed to "commit").
>
> The reason I brought back Atomic Checkpointing was to decouple store flush
> from store commit. This is important, because with Transactional
> StateStores, we now need to call "flush" on *every* Task commit, and not
> just when the StateStore is closing, otherwise our transaction buffer will
> never be written and persisted, instead growing unbounded! I experimented
> with some simple solutions, like forcing a store flush whenever the
> transaction buffer was likely to exceed its configured size, but this was
> brittle: it prevented the transaction buffer from being configured to be
> unbounded, and it still would have required explicit flushes of RocksDB,
> yielding sub-optimal performance and memory utilization.
>
> I deemed Atomic Checkpointing to be the "right" way to resolve this
> problem. By ensuring that the changelog offsets that correspond to the most
> recently written records are always atomically written to the StateStore
> (by writing them to the same transaction buffer), we can avoid forcibly
> flushing the RocksDB memtables to disk, letting RocksDB flush them only
> when necessary, without losing any of our consistency guarantees. See the
> updated KIP for more info.
>
> I have fully implemented these changes, although I'm still not entirely
> happy with the implementation for segmented StateStores, so I plan to
> refactor that. Despite that, all tests pass. If you'd like to try out or
> review this highly experimental and incomplete branch, it's available here:
> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's built
> against Kafka 3.5.0 so that I had a stable base to build and test it on,
> and to enable easy apples-to-apples comparisons in a live environment. I
> plan to rebase it against trunk once it's nearer completion and has been
> proven on our main application.
>
> I would really appreciate help in reviewing and testing:
> - Segmented (Versioned, Session and Window) stores
> - Global stores
>
> As I do not currently use either of these, so my primary test environment
> doesn't test these areas.
>
> I'm going on Parental Leave starting next week for a few weeks, so will
> not have time to move this forward until late August. That said, your
> feedback is welcome and appreciated, I just won't be able to respond as
> quickly as usual.
>
> Regards,
> Nick
>
> On Mon, 3 Jul 2023 at 16:23, Nick Telford  wrote:
>
>> Hi Bruno
>>
>> Yes, that's correct, although the impact on IQ is not something I had
>> considered.
>>
>> What about atomically updating the state store from the transaction
>>> buffer every commit interval and writing the checkpoint (thus, flushing
>>> the memtable) every configured amount of data and/or number of commit
>>> intervals?
>>>
>>
>> I'm not quite sure I follow. Are you suggesting that we add an additional
>> config for the max number of commit intervals between checkpoints? That
>> way, we would checkpoint *either* when the transaction buffers are nearly
>> full, *OR* whenever a certain number of commit intervals have elapsed,
>> whichever comes first?
>>
>> That certainly seems reasonable, although this re-ignites an earlier
>> debate about whether a config should be measured in "number of commit
>> intervals", instead of just an absolute time.
>>
>> FWIW, I realised that this issue is the reason I was pursuing the Atomic
>> Checkpoints, as it de-couples memtable flush from checkpointing, which
>> enables us to just checkpoint on every commit without any performance
>> impact. Atomic Checkpointing is definitely the "best" solution, but I'm not
>> sure if this is enough to bring it back into this KIP.
>>
>> I'm currently working on moving all the transactional logic directly into
>> RocksDBStore itself, which does away with the StateStore#newTransaction
>> method, and reduces the number of new classes introduced, significantly
>> reducing the complexity. If it works, and the complexity is drastically
>> reduced, I may try bringing back Atomic Checkpoints into this KIP.
>>
>> Regards,
>> Nick
>>
>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna  wrote:
>>
>>> Hi Nick,
>>>
>>> Thanks for the insights! Very interesting!
>>>
>>> As far as I understand, you want to atomically update the state store
>>> from the 

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-07-21 Thread Nick Telford
Hi everyone,

I've updated the KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
with the latest changes; mostly bringing back "Atomic Checkpointing" (for
what feels like the 10th time!). I think the one thing missing is some
changes to metrics (notably the store "flush" metrics will need to be
renamed to "commit").

The reason I brought back Atomic Checkpointing was to decouple store flush
from store commit. This is important, because with Transactional
StateStores, we now need to call "flush" on *every* Task commit, and not
just when the StateStore is closing, otherwise our transaction buffer will
never be written and persisted, instead growing unbounded! I experimented
with some simple solutions, like forcing a store flush whenever the
transaction buffer was likely to exceed its configured size, but this was
brittle: it prevented the transaction buffer from being configured to be
unbounded, and it still would have required explicit flushes of RocksDB,
yielding sub-optimal performance and memory utilization.

I deemed Atomic Checkpointing to be the "right" way to resolve this
problem. By ensuring that the changelog offsets that correspond to the most
recently written records are always atomically written to the StateStore
(by writing them to the same transaction buffer), we can avoid forcibly
flushing the RocksDB memtables to disk, letting RocksDB flush them only
when necessary, without losing any of our consistency guarantees. See the
updated KIP for more info.

I have fully implemented these changes, although I'm still not entirely
happy with the implementation for segmented StateStores, so I plan to
refactor that. Despite that, all tests pass. If you'd like to try out or
review this highly experimental and incomplete branch, it's available here:
https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's built
against Kafka 3.5.0 so that I had a stable base to build and test it on,
and to enable easy apples-to-apples comparisons in a live environment. I
plan to rebase it against trunk once it's nearer completion and has been
proven on our main application.

I would really appreciate help in reviewing and testing:
- Segmented (Versioned, Session and Window) stores
- Global stores

As I do not currently use either of these, so my primary test environment
doesn't test these areas.

I'm going on Parental Leave starting next week for a few weeks, so will not
have time to move this forward until late August. That said, your feedback
is welcome and appreciated, I just won't be able to respond as quickly as
usual.

Regards,
Nick

On Mon, 3 Jul 2023 at 16:23, Nick Telford  wrote:

> Hi Bruno
>
> Yes, that's correct, although the impact on IQ is not something I had
> considered.
>
> What about atomically updating the state store from the transaction
>> buffer every commit interval and writing the checkpoint (thus, flushing
>> the memtable) every configured amount of data and/or number of commit
>> intervals?
>>
>
> I'm not quite sure I follow. Are you suggesting that we add an additional
> config for the max number of commit intervals between checkpoints? That
> way, we would checkpoint *either* when the transaction buffers are nearly
> full, *OR* whenever a certain number of commit intervals have elapsed,
> whichever comes first?
>
> That certainly seems reasonable, although this re-ignites an earlier
> debate about whether a config should be measured in "number of commit
> intervals", instead of just an absolute time.
>
> FWIW, I realised that this issue is the reason I was pursuing the Atomic
> Checkpoints, as it de-couples memtable flush from checkpointing, which
> enables us to just checkpoint on every commit without any performance
> impact. Atomic Checkpointing is definitely the "best" solution, but I'm not
> sure if this is enough to bring it back into this KIP.
>
> I'm currently working on moving all the transactional logic directly into
> RocksDBStore itself, which does away with the StateStore#newTransaction
> method, and reduces the number of new classes introduced, significantly
> reducing the complexity. If it works, and the complexity is drastically
> reduced, I may try bringing back Atomic Checkpoints into this KIP.
>
> Regards,
> Nick
>
> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna  wrote:
>
>> Hi Nick,
>>
>> Thanks for the insights! Very interesting!
>>
>> As far as I understand, you want to atomically update the state store
>> from the transaction buffer, flush the memtable of a state store and
>> write the checkpoint not after the commit time elapsed but after the
>> transaction buffer reached a size that would lead to exceeding
>> statestore.transaction.buffer.max.bytes before the next commit interval
>> ends.
>> That means, the Kafka transaction would commit every commit interval but
>> the state store will only be atomically updated roughly every
>> statestore.transaction.buffer.max.bytes of data. Also IQ would then only
>> 

[DISCUSS] KIP-950: Tiered Storage Disablement

2023-07-21 Thread Beyene, Mehari
Hi everyone,
I would like to start a discussion on KIP-950: Tiered Storage Disablement 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement).

This KIP proposes adding the ability to disable and re-enable tiered storage on 
a topic.

Thanks,
Mehari


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2025

2023-07-21 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 392388 lines...]

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [1] true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled(boolean) > [2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreNullRecord() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreNullRecord() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromSourceTopic(boolean) > [2] false 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[1] true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[1] true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldSuccessfullyStartWhenLoggingDisabled(boolean) > 
[2] false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [1] 
true PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [2] false 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
RestoreIntegrationTest > shouldRestoreStateFromChangelogTopic(boolean) > [2] 
false PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 182 > 
SmokeTestDriverIntegrationTest > shouldWorkWithRebalance(boolean) > [1] true 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 181 > 

Re: KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum

2023-07-21 Thread Colin McCabe
On Fri, Jul 21, 2023, at 09:43, José Armando García Sancio wrote:
> Thanks for the KIP Colin. Apologies if some of these points have
> already been made. I have not followed the discussion closely:
>
> 1. Re: Periodically, each controller will check that the controller
> registration for its ID is as expected
>
> Does this need to be periodic? Can't the controller schedule this RPC,
> retry etc, when it finds that the incarnation ID doesn't match its
> own?
>

Hi José,

Thanks for the reviews.

David had the same question. I agree that it should be event-driven rather than 
periodic (except for retries, etc.)

>
> 2. Did you consider including the active controller's epoch in the
> ControllerRegistrationRequest?
>
> This would allow the active controller to reject registration from
> controllers that are not part of the active quorum and don't know the
> latest controller epoch. This should mitigate some of the concerns you
> raised in bullet point 1.
>

Good idea. I will add the active controller epoch to the registration request.

>
> 3. Which endpoint will the inactive controllers use to send the
> ControllerRegistrationRequest?
>
> Will it use the first endpoint described in the cluster metadata
> controller registration record? Or would it use the endpoint described
> in the server configuration at controller.quorum.voters?
>

They will use the endpoint in controller.quorum.voters. In general, the 
endpoints from the registration are only used for responding to 
DESCRIBE_CLUSTER. Since, after all, we may not even have the registration 
endpoints when we start up.

>
> 4. Re: Raft integration in the rejected alternatives
>
> Yes, The KRaft layer needs to solve a similar problem like endpoint
> discovery to support dynamic controller membership change. As you
> point out the requirements are different and the set of information
> that needs to be tracked is different. I think it is okay to use a
> different solution for each of these problems.

Yeah that was my feeling too. Thanks for taking a look.

regards,
Colin

>
> Thanks!
> -- 
> -José


Re: KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum

2023-07-21 Thread Colin McCabe
On Fri, Jul 21, 2023, at 08:14, David Arthur wrote:
> Hey Colin, thanks for the KIP! Some questions
>

Hi David,

Thanks for reviewing.

> 1) "This registration will include information about the endpoints which
> they possess"  Will this include all endpoints, or only those configured in
> "advertised.listeners"
>

The registration information includes all controller endpoints. Keep in mind, 
it is not valid for any controller listeners to appear in advertised.listeners.

>
> 2) "Periodically, each controller will check that the controller
> registration for its ID is as expected."  Does this need to be a periodic
> check? Since the controller registration state will be in the log, can't
> the follower just react to unexpected incarnation IDs (after it's caught
> up)?
>

I think it will hang off of a metadata publisher. You are right that it can 
mostly be event-driven. See a metadata update that overwrites your registration 
=> act to re-register.

The "periodic" stuff comes in in cases where we fail to register and have to 
try again. I'll try to clarify the wording.

>
> 3) ControllerRegistrationRequest has a typo in the listeners section (it
> mentions "broker")
>

Fixed

>
> 4) Since we can't rely on the ApiVersions data, should we remove the field
> we added to ApiVersionsResponse in KIP-866?
>

Yes, this is a good point. I'll mark it as deprecated.

>
> 5)I filed https://issues.apache.org/jira/browse/KAFKA-15230 for the issues
> mentioned under "Controller Changes" in case you want to link it
>

Added

>
> 6) I don't see it explicitly mentioned, but I think it's the case that the
> active controller must accept and persist any controller registration it
> receives. This is unlike the behavior of broker registrations where we can
> reject brokers we don't want. For controllers, I don't think we have that
> option unless we go for some tighter Raft integration. Since the followers
> must be participating in Raft to learn about the leader (and therefore,
> will have replayed the full log), we can't really say "no" at that point.
>

Agreed. I added some wording to this effect in the 
ControllerRegistrationRequest  section. Also specified that we can return 
NOT_CONTROLLER from this API, when not active.

cheers,
Colin
 
>
> Cheers,
> David
>
>
> On Thu, Jul 20, 2023 at 7:23 PM Colin McCabe  wrote:
>
>> On Tue, Jul 18, 2023, at 09:30, Mickael Maison wrote:
>> > H Colin,
>> >
>> > Thanks for the KIP.
>> >
>> > Just a few points:
>> > 1. As Tom mentioned it would be good to clarify the APIs we expect
>> > available on controllers. I assume we want to add DESCRIBE_CONFIGS as
>> > part of this KIP.
>>
>> Hi Mickael,
>>
>> Yes, this is a good point. I added a table describing the APIs that will
>> now be added.
>>
>> > 2. Currently we have no way of retrieving the list of configs that
>> > apply to controllers. It would be good to have an object, so we can
>> > add that to the docs but also use that in kafka-configs.
>>
>> I think this is out of scope.
>>
>> > 3. Should we have a new entity-type in kafka-configs for setting
>> > controller configs?
>>
>> The BROKER entity type already applies to controllers. It probably needs a
>> new name (NODE would be better) but that's out of scope for this KIP, I
>> think.
>>
>> best,
>> Colin
>>
>>
>> >
>> > Thanks,
>> > Mickael
>> >
>> > On Tue, Jul 4, 2023 at 2:20 PM Luke Chen  wrote:
>> >>
>> >> Hi Colin,
>> >>
>> >> Thanks for the answers to my previous questions.
>> >>
>> >> > Yes, the common thread here is that all of these shell commands
>> perform
>> >> operations can be done without the broker. So it's reasonable to allow
>> them
>> >> to be done without going through the broker. I don't know if we need a
>> >> separate note for each since the rationale is really the same for all
>> (is
>> >> it reasonable? if so allow it.)
>> >>
>> >> Yes, it makes sense. Could we make a note about the main rationale for
>> >> selecting these command-line tools in the KIP to make it clear?
>> >> Ex: The following command-line tools will get a new
>> --bootstrap-controllers
>> >> argument (because these shell commands perform operations can be done
>> >> without the broker):
>> >>
>> >> > kafka-reassign-partitions.sh cannot be used to move the
>> >> __cluster_metadata topic. However, it can be used to move partitions
>> that
>> >> reside on the brokers, even when using --bootstrap-controllers to talk
>> >> directly to the quorum.
>> >>
>> >> Fair enough.
>> >>
>> >>
>> >> 4. Does all the command-line tools with `--bootstrap-controllers`
>> support
>> >> all the options in the tool?
>> >> For example, kafka-configs.sh, In addition to the `--alter` option you
>> >> mentioned in the example, do we also support `--describe` or `--delete`
>> >> option?
>> >> If so, do we also support setting "quota" for users/clients/topics...
>> via
>> >> `--bootstrap-controllers`? (not intuitive, but maybe we just directly
>> >> commit the change into the metadata from controller?)
>> >>
>> >> 

Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-21 Thread Matthias J. Sax
I am not a clients (or threading) expert, but I tend to agree to Colin's 
concerns.


In particular, it would be nice to see an example how you intent to use 
the API (I am not familiar with Kotlin or it's co-routins), to better 
understand what this changes help to solve to begin with.


Opening up the consumer sounds potentially dangerous and we should 
weight opportunity and risk before making a decision. So far, I see 
risks but do not understand the opportunity you are after.



-Matthias

On 7/14/23 11:43 AM, Kirk True wrote:

Hi Erik,

Thanks for the KIP!

I empathize with your frustration over the radio silence. It gets like that 
sometimes, and I apologize for my lack of feedback.

I’d personally like to see this lively exchange move over to the DISCUSS thread 
you’d created before.

Thanks,
Kirk


On Jul 14, 2023, at 1:33 AM, Erik van Oosten  
wrote:

Hi Colin,

The way I understood Philp's message is that KIP-944 also plays nice with 
KIP-945. But I might be mistaken.

Regardless, KIP-945 does /not/ resolve the underlying problem (the need for 
nested consumer invocations) because it has the explicit goal of not changing 
the user facing API.


... KIP-945 but haven't posted a DISCUSS thread yet


There is a thread called 'KafkaConsumer refactor proposal', but indeed no 
official discussion yet.


I really don't want to be debugging complex interactions between Java 
thread-local variables and green threads.


In that email thread, I proposed an API change in which callbacks are no longer 
needed. The proposal completely removes the need for such complex interactions. 
In addition, it gives clients the ability to process at full speed even while a 
coorperative rebalance is ongoing.

Regards,
 Erik.

Op 14-07-2023 om 00:36 schreef Colin McCabe:

HI Philip & Erik,

Hmm... if we agree that KIP-945 addresses this use case, I think it would be 
better to just focus on that KIP. Fundamentally it's a better and cleaner model 
than a complex scheme involving thread-local variables. I really don't want to 
be debugging complex interactions between Java thread-local variables and green 
threads.

It also generally helps to have some use-cases in mind when writing these 
things. If we get feedback about what would be useful for async runtimes, that 
would probably help improve and focus KIP-945. By the way, I can see you have a 
draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I 
assume it's not ready for review yet ;)

best,
Colin


On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote:

Hey Erik - Another thing I want to add to my comment is.  We are in-process
of re-writing the KafkaConsumer, and I think your proposal would work in
the new consumer because we are going to separate the user thread and the
background thread.  Here is the 1-pager, and we are in process of
converting this in to KIP-945.

Thanks,
P

On Tue, Jul 11, 2023 at 10:33 AM Philip Nee  wrote:


Hey Erik,

Sorry for holding up this email for a few days since Colin's response
includes some of my concerns.  I'm in favor of this KIP, and I think your
approach seems safe.  Of course, I probably missed something therefore I
think this KIP needs to cover different use cases to demonstrate it doesn't
cause any unsafe access. I think this can be demonstrated via diagrams and
some code in the KIP.

Thanks,
P

On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
 wrote:


Hello Colin,

  >> In KIP-944, the callback thread can only delegate to another thread
after reading from and writing to a threadlocal variable, providing the
barriers right there.

  > I don't see any documentation that accessing thread local variables
provides a total store or load barrier. Do you have such documentation?
It seems like if this were the case, we could eliminate volatile
variables from most of the code base.

Now I was imprecise. The thread-locals are only somewhat involved. In
the KIP proposal the callback thread reads an access key from a
thread-local variable. It then needs to pass that access key to another
thread, which then can set it on its own thread-local variable. The act
of passing a value from one thread to another implies that a memory
barrier needs to be passed. However, this is all not so relevant since
there is no need to pass the access key back when the other thread is
done.

But now I think about it a bit more, the locking mechanism runs in a
synchronized block. If I remember correctly this should be enough to
pass read and write barriers.

  >> In the current implementation the consumer is also invoked from
random threads. If it works now, it should continue to work.
  > I'm not sure what you're referring to. Can you expand on this?

Any invocation of the consumer (e.g. method poll) is not from a thread
managed by the consumer. This is what I was assuming you meant with the
term 'random thread'.

  > Hmm, not sure what you mean by "cooperate with blocking code." If you
have 10 green threads you're multiplexing on to one 

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Matthias J. Sax

Thanks for the KIP. Overall I like the idea to close this gap.

However, I am wondering if we should close others gaps first? In 
particular, IIRC, we have a few cases for which we only have a RocksDB 
implementation for a store, and thus, adding an in-memory version for 
these stores first, to make the current `IN_MEMORY` parameter work, 
might be the first step?


In particular, this holds for the new versioned-store (but I actually 
believe the is some other internal store with no in-memory 
implementation). -- For `suppress()` it's actually other way around we 
we only have an in-memory implementation. Do you aim to allow custom 
stores for `suppress()`, too?


Btw: Should versioned stores also be covered by the KIP (ie, 
`StoreTypeSpec`)? We did consider to add a new option `VERSIONED` to the 
existing `default.dsl.store` config, but opted out for various reasons.


Last, I am not sure if the new parameter replacing the existing one is 
the best way to go? Did you put the idea to add `CUSTOM` to the existing 
config into rejected alternative. Personally, I would prefer to add 
`CUSTOM` as I would like to optimize to easy of use for the majority of 
users (which don't implement a custom store), but only switch to 
in-memory sometimes. -- As an alternative, you would also just extend 
`dsl.default.store` (it's just a String) and allow to pass in anything. 
If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as 
we do know, and if know we assume it's a fully qualified class name and 
try to instantiate it? -- Given that we plan to keep the store-enum, is 
seems cleaner to keep the existing config and keep both the config and 
enum aligned to each other?



It's just preliminary thought. I will need to go back an take a more 
detailed look into the code to grok how the propose `StoreTypeSpec` 
falls into place. Also wondering how it would related to the existing 
`Stores` factory?


-Matthias


On 7/21/23 6:45 AM, Colt McNealy wrote:

Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:


Order is not guaranteed as bytes lexicographical ordering might not

represent key order.

no longer correct, and should say:


Order guarantees depend on the underlying implementation of the

ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman 
wrote:


Hey Almog, first off, thanks for the KIP! I (and others) raised concerns
over how restrictive the default.dsl.store config would be if not
extendable to custom store types, especially given that this seems to be
the primary userbase of such a feature. At the time we didn't really have
any better ideas for a clean way to achieve that, but what you proposed
makes a lot of sense to me. Happy to see a good solution to this, and
hopefully others will share my satisfaction :P

I did have one quick piece of feedback which arose from an unrelated
question posed to the dev mailing list w/ subject line
"ReadOnlyKeyValueStore#range()
Semantics"
. I
recommend checking out the full thread for context, but it made me think
about how we can leverage the new StoreTypeSpec concept as an answer to the
long-standing question in Streams: where can we put guarantees of the
public contract for RocksDB (or other store implementations) when all the
RocksDB stuff is technically internal.

Basically, I'm suggesting two things: first, call out in some way (perhaps
the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a public
contract in itself and should outline any semantic guarantees it does, or
does not, make. Second, we should add a note on ordering guarantees in the
two OOTB specs: for RocksDB we assert that range queries will honor
serialized byte ordering, whereas the InMemory flavor gives no ordering
guarantee whatsoever at this time.

Thoughts?

-Sophie

On Thu, Jul 20, 2023 at 4:28 PM Almog Gavra  wrote:


Hi All,

I would like to propose a KIP to expand support for default store types
(KIP-591) to encompass custom store implementations:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types


Looking forward to your feedback!

Cheers,
Almog







Re: How to request review for pull request

2023-07-21 Thread Boudjelda Mohamed Said
Hello Wilma,

  This is a good contribution guide to start with,
https://kafka.apache.org/contributing

   Yes you can start a merge request, first ensures well it builds locally
and then the reviewers will check your merge request the of course you will
get a feedback if something went wrong, you can also check by yourself the
Jenkins ci builds to be sure things are ok



Thanks



On Fri 21 Jul 2023 at 20:17, Willma  wrote:

> Hi team, I just made my first pull request to the kafka repo. I wonder how
> can I request review for my pr, also do I need to have all 4 builds success
> before my pr can be reviewed? Thanks and looking forward to your reply.
>


RE: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-21 Thread Emanuele Sabellico
The downsides of bumping the version is that clients have to have all 
the latest features implemented before being able to benefit from this 
performance improvement.
One of the benefits of using a tagged field is to make the field 
available to previous versions too.

Choosing a minimum value for taggedVersions could be an alternative.

On 2023/07/13 17:30:45 Andrew Schofield wrote:
> Hi Mayank,
> If we bump the version, the broker can tell whether it’s worth 
providing the leader
> endpoint information to the client when the leader has changed. 
That’s my reasoning.

>
> Thanks,
> Andrew
>
> > On 13 Jul 2023, at 18:02, Mayank Shekhar Narula wrote:
> >
> > Thanks both for looking into this.
> >
> > Jose,
> >
> > 1/2 & 4(changes for PRODUCE) & 5 makes sense, will follow
> >
> > 3. If I understood this correctly, certain replicas "aren't" 
brokers, what

> > are they then?
> >
> > Also how about replacing "Replica" with "Leader", this is more 
readable on

> > the client. so, how about this?
> > { "name": "LeaderEndpoints", "type": "[]Leader", "versions": "15+",
> > "taggedVersions": "15+", "tag": 3,
> > "about": "Endpoints for all current leaders enumerated in
> > PartitionData.", "fields": [
> > { "name": "NodeId", "type": "int32", "versions": "15+",
> > "mapKey": true, "entityType": "brokerId", "about": "The ID of the
> > associated leader"},
> > { "name": "Host", "type": "string", "versions": "15+",
> > "about": "The leader's hostname." },
> > { "name": "Port", "type": "int32", "versions": "15+",
> > "about": "The leader's port." },
> > { "name": "Rack", "type": "string", "versions": "15+", "ignorable":
> > true, "default": "null",
> > "about": "The rack of the leader, or null if it has not been
> > assigned to a rack." }
> > ]}
> >
> > Andrew
> >
> > 6. I wonder if non-Kafka clients might benefit from not bumping the
> > version. If versions are bumped, say for FetchResponse to 16, I believe
> > that client would have to support all versions until 16 to fully 
utilise
> > this feature. Whereas, if not bumped, they can simply support until 
version
> > 12( will change to version:12 for tagged fields ), and non-AK 
clients can
> > then implement this feature. What do you think? I am inclined to 
not bump.

> >
> > On Thu, Jul 13, 2023 at 5:21 PM Andrew Schofield <
> > andrew_schofield_j...@outlook.com> wrote:
> >
> >> Hi José,
> >> Thanks. Sounds good.
> >>
> >> Andrew
> >>
> >>> On 13 Jul 2023, at 16:45, José Armando García Sancio
> >> wrote:
> >>>
> >>> Hi Andrew,
> >>>
> >>> On Thu, Jul 13, 2023 at 8:35 AM Andrew Schofield
> >>> wrote:
>  I have a question about José’s comment (2). I can see that it’s
> >> possible for multiple
>  partitions to change leadership to the same broker/node and it’s
> >> wasteful to repeat
>  all of the connection information for each topic-partition. But, I
> >> think it’s important to
>  know which partitions are now lead by which node. That 
information at

> >> least needs to be
>  per-partition I think. I may have misunderstood, but it sounded like
> >> your comment
>  suggestion lost that relationship.
> >>>
> >>> Each partition in both the FETCH response and the PRODUCE response
> >>> will have the CurrentLeader, the tuple leader id and leader epoch.
> >>> Clients can use this information to update their partition to leader
> >>> id and leader epoch mapping.
> >>>
> >>> They can also use the NodeEndpoints to update their mapping from
> >>> replica id to the tuple host, port and rack so that they can connect
> >>> to the correct node for future FETCH requests and PRODUCE requests.
> >>>
> >>> Thanks,
> >>> --
> >>> -José
> >>
> >>
> >
> > --
> > Regards,
> > Mayank Shekhar Narula
>
>

How to request review for pull request

2023-07-21 Thread Willma
Hi team, I just made my first pull request to the kafka repo. I wonder how
can I request review for my pr, also do I need to have all 4 builds success
before my pr can be reviewed? Thanks and looking forward to your reply.


Request contributor permissions to kafka

2023-07-21 Thread Boudjelda Mohamed Said
Hi
I would like to have a contributor permission to kafka, this is my jira
id: *bmscomp*


Best regards


[jira] [Created] (KAFKA-15233) Add public documentation for plugin.discovery migration steps

2023-07-21 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15233:
---

 Summary: Add public documentation for plugin.discovery migration 
steps
 Key: KAFKA-15233
 URL: https://issues.apache.org/jira/browse/KAFKA-15233
 Project: Kafka
  Issue Type: Task
  Components: docs, KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris
 Fix For: 3.6.0


The migration instructions in the KIP should be made more explicit and located 
in the public documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum

2023-07-21 Thread José Armando García Sancio
Thanks for the KIP Colin. Apologies if some of these points have
already been made. I have not followed the discussion closely:

1. Re: Periodically, each controller will check that the controller
registration for its ID is as expected

Does this need to be periodic? Can't the controller schedule this RPC,
retry etc, when it finds that the incarnation ID doesn't match its
own?

2. Did you consider including the active controller's epoch in the
ControllerRegistrationRequest?

This would allow the active controller to reject registration from
controllers that are not part of the active quorum and don't know the
latest controller epoch. This should mitigate some of the concerns you
raised in bullet point 1.

3. Which endpoint will the inactive controllers use to send the
ControllerRegistrationRequest?

Will it use the first endpoint described in the cluster metadata
controller registration record? Or would it use the endpoint described
in the server configuration at controller.quorum.voters?

4. Re: Raft integration in the rejected alternatives

Yes, The KRaft layer needs to solve a similar problem like endpoint
discovery to support dynamic controller membership change. As you
point out the requirements are different and the set of information
that needs to be tracked is different. I think it is okay to use a
different solution for each of these problems.

Thanks!
-- 
-José


Re: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-21 Thread Mayank Shekhar Narula
David

03. Fixed as well, to remove ignorable. In MetadataResponse, Rack came a
version later, hence was marked ignorable.

Thanks.

On Fri, Jul 21, 2023 at 1:38 PM Mayank Shekhar Narula <
mayanks.nar...@gmail.com> wrote:

> Hi David
>
>  01. My reasoning noted in the KIP is that CurrentLeader was first added
> in version 12, so 12 is the least version where clients could get these
> optimisations. So any client can now choose to implement this with version
> 12 of the protocol itself. If the version is bumped to X(say 16), then all
> clients(including non Java ones) will need to implement all versions from
> 13-16 to get these optimisations. And since the scenario of leader changing
> is not common, so bytes wasted on the network won't be much. What do you
> think?
>
> 02. For ProduceRequest, same reasoning. And corrected the taggedVersion.
>
> 03. Regarding making rack ignorable, my decision was inspired by a similar
> field in MetadataResponse. I must admit I am not clear, why was it made
> ignorable there originally?
>
>
> On Fri, Jul 21, 2023 at 11:14 AM David Jacot 
> wrote:
>
>> Hi Mayank,
>>
>> Thanks for the KIP. This is an interesting idea that I have been thinking
>> about for a long time so I am happy to see it. The gain is smaller than I
>> expected but still worth it in my opinion.
>>
>> 01. In the FetchResponse, what's the reason for using version `12+` for
>> the
>> new tagged field `NodeEndpoints`? Old clients won't use this field because
>> they are not aware of it. It seems to me that we are wasting bytes on the
>> wire by sending those unnecessarily. Is there a reason that I may have
>> missed? Intuitively, I would have bumped the version of the FetchRequest
>> to
>> 16 and I would have populated and sent `NodeEndpoints` only if version 16
>> is used by the client.
>>
>> 02. I have the very same question for the ProduceRequest's `CurrentLeader`
>> and `NodeEndpoints` fields. Note that the `taggedVerions` of
>> `NodeEndpoints` is incorrect.
>>
>> 03. For both, does the field `Rack` have to really be ignorable? That does
>> not seem necessary to me but I may be wrong.
>>
>> Best,
>> David
>>
>> On Fri, Jul 21, 2023 at 12:32 AM Crispin Bernier
>>  wrote:
>>
>> > Benchmark numbers have been posted on the KIP, please review.
>> >
>> > On 2023/07/20 13:03:00 Mayank Shekhar Narula wrote:
>> > > Jun
>> > >
>> > > Thanks for the feedback.
>> > >
>> > > Numbers to follow.
>> > >
>> > > If we don't plan to
>> > > > bump up the FetchResponse version, we could just remove the
>> reference
>> > to
>> > > > version 16.
>> > >
>> > > Fixed.
>> > >
>> > > On Thu, Jul 20, 2023 at 1:28 AM Jun Rao 
>> > wrote:
>> > >
>> > > > Hi, Mayank,
>> > > >
>> > > > Thanks for the KIP. I agree with others that it would be useful to
>> see
>> > the
>> > > > performance results. Otherwise, just a minor comment. If we don't
>> plan
>> > to
>> > > > bump up the FetchResponse version, we could just remove the
>> reference
>> > to
>> > > > version 16.
>> > > >
>> > > > Jun
>> > > >
>> > > > On Wed, Jul 19, 2023 at 2:31 PM Mayank Shekhar Narula <
>> > > > mayanks.nar...@gmail.com> wrote:
>> > > >
>> > > > > Luke
>> > > > >
>> > > > > Thanks for the interest in the KIP.
>> > > > >
>> > > > > But what if the consumer was fetching from the follower?
>> > > > >
>> > > > > We already include `PreferredReadReplica` in the fetch response.
>> > > > > > Should we put the node info of PreferredReadReplica under this
>> > case,
>> > > > > > instead of the leader's info?
>> > > > > >
>> > > > >
>> > > > > PreferredReadReplica is the decided on the leader. Looking at the
>> > Java
>> > > > > client code, AbstractFetch::selectReadReplica, first fetch request
>> > goes
>> > > > to
>> > > > > Leader of the partition -> Sends back PreferredReadReplica -> Next
>> > fetch
>> > > > > uses PreferredReadReplica. So as long as leader is available,
>> > > > > PreferredReadReplica would be found in subsequent fetches.
>> > > > >
>> > > > > Also, under this case, should we include the leader's info in the
>> > > > response?
>> > > > >
>> > > > >
>> > > > > In this case, I think the follower would fail the fetch if it
>> knows a
>> > > > > different leader. If the follower knows a newer leader, it would
>> > return
>> > > > new
>> > > > > leader information in the response, for the client to act on.
>> > > > >
>> > > > >
>> > > > > Will we include the leader/node info in the response when having
>> > > > > > `UNKNOWN_LEADER_EPOCH` error?
>> > > > >
>> > > > >
>> > > > > My understanding is UNKNOWN_LEADER_EPOCH when a request from a
>> client
>> > > > has a
>> > > > > newer epoch than the broker. So the client is already up to date
>> on
>> > new
>> > > > > leader information, it's the broker that has the catching up to
>> do. I
>> > > > think
>> > > > > there might be some optimisations to make sure the broker
>> refreshes
>> > its
>> > > > > metadata quickly, so it can quickly recover to handle requests
>> that
>> > > > > previously returned 

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2024

2023-07-21 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 385594 lines...]

* What went wrong:
Execution failed for task ':storage:unitTest'.
> Process 'Gradle Test Executor 140' finished with non-zero exit value 1
  This problem might be caused by incorrect test process configuration.
  For more on test execution, please refer to 
https://docs.gradle.org/8.2.1/userguide/java_testing.html#sec:test_execution in 
the Gradle documentation.

* Try:
> Run with --stacktrace option to get the stack trace.
> Run with --info or --debug option to get more log output.
> Get more help at https://help.gradle.org.

Deprecated Gradle features were used in this build, making it incompatible with 
Gradle 9.0.

You can use '--warning-mode all' to show the individual deprecation warnings 
and determine if they come from your own scripts or plugins.

For more on this, please refer to 
https://docs.gradle.org/8.2.1/userguide/command_line_interface.html#sec:command_line_warnings
 in the Gradle documentation.

BUILD FAILED in 2h 46m 52s
231 actionable tasks: 124 executed, 107 up-to-date

Publishing build scan...
https://ge.apache.org/s/kwgyi7nbbcufe


See the profiling report at: 
file:///home/jenkins/workspace/Kafka_kafka_trunk/build/reports/profile/profile-2023-07-21-08-41-52.html
A fine-grained performance profile is available: use the --scan option.
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // node
[Pipeline] }
[Pipeline] // timestamps
[Pipeline] }
[Pipeline] // timeout
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
Failed in branch JDK 11 and Scala 2.13

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KStreamKStreamIntegrationTest > shouldOuterJoin() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KStreamKStreamIntegrationTest > shouldOuterJoin() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest > 
shouldInnerJoinMultiPartitionQueryable() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest > 
shouldInnerJoinMultiPartitionQueryable() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled()
 STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled()
 PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2Enabled() 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
KTableSourceTopicRestartIntegrationTest > 
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2Enabled() 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
LagFetchIntegrationTest > shouldFetchLagsDuringRebalancingWithOptimization() 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
LagFetchIntegrationTest > shouldFetchLagsDuringRebalancingWithOptimization() 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
LagFetchIntegrationTest > shouldFetchLagsDuringRestoration() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
LagFetchIntegrationTest > shouldFetchLagsDuringRestoration() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
LagFetchIntegrationTest > shouldFetchLagsDuringRebalancingWithNoOptimization() 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
LagFetchIntegrationTest > shouldFetchLagsDuringRebalancingWithNoOptimization() 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 179 > 
OptimizedKTableIntegrationTest > 

[jira] [Resolved] (KAFKA-13431) Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

2023-07-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13431.
---
Fix Version/s: 3.6.0
   Resolution: Done

> Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit 
> users)
> ---
>
> Key: KAFKA-13431
> URL: https://issues.apache.org/jira/browse/KAFKA-13431
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Diego Erdody
>Assignee: Yash Mayya
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> There's currently an incompatibility between Sink connectors overriding the 
> {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that 
> mutate the topic field.
> The problem was present since the {{preCommit}} method inception and is 
> rooted in a mismatch between the topic/partition that is passed to 
> {{open/preCommit}} (the original topic and partition before applying any 
> transformations) and the topic partition that is present in the SinkRecord 
> that the {{SinkTask.put}} method receives (after transformations are 
> applied). Since that's all the information the connector has to implement any 
> kind of internal offset tracking, the topic/partitions it can return in 
> preCommit will correspond to the transformed topic, when the framework 
> actually expects it to be the original topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum

2023-07-21 Thread David Arthur
Hey Colin, thanks for the KIP! Some questions

1) "This registration will include information about the endpoints which
they possess"  Will this include all endpoints, or only those configured in
"advertised.listeners"

2) "Periodically, each controller will check that the controller
registration for its ID is as expected."  Does this need to be a periodic
check? Since the controller registration state will be in the log, can't
the follower just react to unexpected incarnation IDs (after it's caught
up)?

3) ControllerRegistrationRequest has a typo in the listeners section (it
mentions "broker")

4) Since we can't rely on the ApiVersions data, should we remove the field
we added to ApiVersionsResponse in KIP-866?

5)I filed https://issues.apache.org/jira/browse/KAFKA-15230 for the issues
mentioned under "Controller Changes" in case you want to link it

6) I don't see it explicitly mentioned, but I think it's the case that the
active controller must accept and persist any controller registration it
receives. This is unlike the behavior of broker registrations where we can
reject brokers we don't want. For controllers, I don't think we have that
option unless we go for some tighter Raft integration. Since the followers
must be participating in Raft to learn about the leader (and therefore,
will have replayed the full log), we can't really say "no" at that point.


Cheers,
David


On Thu, Jul 20, 2023 at 7:23 PM Colin McCabe  wrote:

> On Tue, Jul 18, 2023, at 09:30, Mickael Maison wrote:
> > H Colin,
> >
> > Thanks for the KIP.
> >
> > Just a few points:
> > 1. As Tom mentioned it would be good to clarify the APIs we expect
> > available on controllers. I assume we want to add DESCRIBE_CONFIGS as
> > part of this KIP.
>
> Hi Mickael,
>
> Yes, this is a good point. I added a table describing the APIs that will
> now be added.
>
> > 2. Currently we have no way of retrieving the list of configs that
> > apply to controllers. It would be good to have an object, so we can
> > add that to the docs but also use that in kafka-configs.
>
> I think this is out of scope.
>
> > 3. Should we have a new entity-type in kafka-configs for setting
> > controller configs?
>
> The BROKER entity type already applies to controllers. It probably needs a
> new name (NODE would be better) but that's out of scope for this KIP, I
> think.
>
> best,
> Colin
>
>
> >
> > Thanks,
> > Mickael
> >
> > On Tue, Jul 4, 2023 at 2:20 PM Luke Chen  wrote:
> >>
> >> Hi Colin,
> >>
> >> Thanks for the answers to my previous questions.
> >>
> >> > Yes, the common thread here is that all of these shell commands
> perform
> >> operations can be done without the broker. So it's reasonable to allow
> them
> >> to be done without going through the broker. I don't know if we need a
> >> separate note for each since the rationale is really the same for all
> (is
> >> it reasonable? if so allow it.)
> >>
> >> Yes, it makes sense. Could we make a note about the main rationale for
> >> selecting these command-line tools in the KIP to make it clear?
> >> Ex: The following command-line tools will get a new
> --bootstrap-controllers
> >> argument (because these shell commands perform operations can be done
> >> without the broker):
> >>
> >> > kafka-reassign-partitions.sh cannot be used to move the
> >> __cluster_metadata topic. However, it can be used to move partitions
> that
> >> reside on the brokers, even when using --bootstrap-controllers to talk
> >> directly to the quorum.
> >>
> >> Fair enough.
> >>
> >>
> >> 4. Does all the command-line tools with `--bootstrap-controllers`
> support
> >> all the options in the tool?
> >> For example, kafka-configs.sh, In addition to the `--alter` option you
> >> mentioned in the example, do we also support `--describe` or `--delete`
> >> option?
> >> If so, do we also support setting "quota" for users/clients/topics...
> via
> >> `--bootstrap-controllers`? (not intuitive, but maybe we just directly
> >> commit the change into the metadata from controller?)
> >>
> >> 5. Do we have any plan for this feature to be completed? v3.6.0?
> >>
> >>
> >> Thank you.
> >> Luke
> >>
> >>
> >> On Fri, Apr 28, 2023 at 1:42 AM Colin McCabe 
> wrote:
> >>
> >> > On Wed, Apr 26, 2023, at 22:08, Luke Chen wrote:
> >> > > Hi Colin,
> >> > >
> >> > > Some comments:
> >> > > 1. I agree we should set "top-level" errors for metadata response
> >> > >
> >> > > 2. In the "brokers" field of metadata response from controller,
> it'll
> >> > > respond with "Controller endpoint information as given in
> >> > > controller.quorum.voters", instead of the "alive"
> controllers(voters).
> >> > That
> >> > > will break the existing admin client because in admin client, we'll
> rely
> >> > on
> >> > > the metadata response to build the "current alive brokers" list, and
> >> > choose
> >> > > one from them to connect (either least load or other criteria). That
> >> > means,
> >> > > if now, we return the value in `controller.quorum.voters`, but one
> of
> 

[jira] [Created] (KAFKA-15232) Move ToolsUtils to tools

2023-07-21 Thread Federico Valeri (Jira)
Federico Valeri created KAFKA-15232:
---

 Summary: Move ToolsUtils to tools
 Key: KAFKA-15232
 URL: https://issues.apache.org/jira/browse/KAFKA-15232
 Project: Kafka
  Issue Type: Sub-task
Reporter: Federico Valeri
 Fix For: 3.6.0


The ToolsUtils class is currently hosted in server-common, but only used by 
tools, so it makes sense to move it to the same module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-21 Thread Federico Valeri
Hi, the point that the legacy approach can only be taken once is
valid, so LGTM. Thanks.

Cheers
Fede

On Fri, Jul 21, 2023 at 4:28 PM Chris Egerton  wrote:
>
> Hi Omnia,
>
> LGTM, thanks! We may want to note the LegacyReplicationPolicy option in the
> rejected alternatives section in case others prefer that option.
>
> Given that we'd like this to land in time for 3.6.0, you may want to start
> a vote thread soon.
>
> Cheers,
>
> Chris
>
> On Fri, Jul 21, 2023 at 10:08 AM Omnia Ibrahim 
> wrote:
>
> > Hi Chris and Federico,
> > thinking about I think Chris's concern is valid and the bigger concern here
> > is that having this `LegacyReplicationPolicy` will eventually open the door
> > for diversion at some point between this `LegacyReplicationPolicy` and the
> > default one.
> > For now, let's have the flag properly fix this bug and we can keep it as an
> > option for people to switch between both behaviours. I know having a
> > bug-fix property is not great but we can treat it as a backward
> > compatibility property instead in order to keep old mirrors using the old
> > internal topics.
> >
> > Hope this is reasonable for the time being.
> >
> > Cheers,
> > Omnia
> >
> > On Wed, Jul 19, 2023 at 11:16 PM Chris Egerton 
> > wrote:
> >
> > > Hi Federico,
> > >
> > > Ah yes, sorry about that! You're correct that this would keep the two
> > > classes in line and largely eliminate the concern I posed about porting
> > > changes to both. Still, I'm a bit hesitant, and I'm not actually certain
> > > that this alternative is more intuitive. The name isn't very descriptive,
> > > and this is the kind of approach we can only really take once; if we
> > break
> > > compatibility again, would we introduce a LegacyLegacyReplicationPolicy?
> > > LegacyReplicationPolicy2? Finally, it seems a bit strange to introduce a
> > > new class to implement a change in behavior this small.
> > >
> > > That said, I don't think this is worth blocking on and wouldn't be
> > opposed
> > > if others felt strongly that a new replication policy class is superior
> > to
> > > a new property on the existing class.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Jul 19, 2023 at 2:53 PM Federico Valeri 
> > > wrote:
> > >
> > > > Hi Chris, the KIP says it would be a subclass of
> > DefaultReplicationPolicy
> > > > that overrides the ReplicationPolicy.offsetSyncsTopic and
> > > > ReplicationPolicy.checkpointsTopic. So, not much to maintain and it
> > would
> > > > be more intuitive, as you say.
> > > >
> > > > On Wed, Jul 19, 2023, 4:50 PM Chris Egerton 
> > > > wrote:
> > > >
> > > > > HI all,
> > > > >
> > > > > I'm not sure I understand the benefits of introducing a separate
> > > > > replication policy class, besides maybe being more readable/intuitive
> > > to
> > > > > users who would want to know when to use one or the other. It feels
> > > like
> > > > > we've swapped out a "fix the bug" property for an entire "fix the
> > bug"
> > > > > class, and it still leaves the problem of graceful migration from
> > > legacy
> > > > > behavior to new behavior unsolved. It would also require us to
> > consider
> > > > > whether any future changes we make to the DefaultReplicationPolicy
> > > class
> > > > > would have to be ported over to the LegacyReplicationPolicy class as
> > > > well.
> > > > >
> > > > > Perhaps I'm missing something; are there other benefits of
> > introducing
> > > a
> > > > > separate replication policy class?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Wed, Jul 19, 2023 at 5:45 AM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Federico,
> > > > > > I like the idea of implementing `LegacyReplicationPolicy` and
> > > avoiding
> > > > > bug
> > > > > > fixes properties. We can drop the idea of the property and just go
> > > > ahead
> > > > > > with introducing the `LegacyReplicationPolicy` and any user upgrade
> > > > from
> > > > > > pre-KIP-690 can use this policy instead of default and no impact
> > will
> > > > > > happen to users upgrading from 3.x to post-KIP-949. We can mark
> > > > > > `LegacyReplicationPolicy` as deprecated later if we want (but not
> > in
> > > > 4.0
> > > > > as
> > > > > > this is very soon) and we can drop it entirely at some point.
> > > > > >
> > > > > > If there's an agreement on this approach I can upgrade the KIP.
> > > > > >
> > > > > > Cheers.
> > > > > > Omnia
> > > > > >
> > > > > > On Wed, Jul 19, 2023 at 8:52 AM Federico Valeri <
> > > fedeval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi, one way to avoid the "fix the bug property" would be to
> > provide
> > > > > > > and document an additional LegacyReplicationPolicy, but still
> > > keeping
> > > > > > > the current DefaultReplicationPolicy as replication.policy.class
> > > > > > > default value, which is basically one of the workarounds
> > suggested
> > > in
> > > > > > > the KIP.
> > > > > > >
> > > > > > > On Tue, Jul 

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-21 Thread Chris Egerton
Hi Omnia,

LGTM, thanks! We may want to note the LegacyReplicationPolicy option in the
rejected alternatives section in case others prefer that option.

Given that we'd like this to land in time for 3.6.0, you may want to start
a vote thread soon.

Cheers,

Chris

On Fri, Jul 21, 2023 at 10:08 AM Omnia Ibrahim 
wrote:

> Hi Chris and Federico,
> thinking about I think Chris's concern is valid and the bigger concern here
> is that having this `LegacyReplicationPolicy` will eventually open the door
> for diversion at some point between this `LegacyReplicationPolicy` and the
> default one.
> For now, let's have the flag properly fix this bug and we can keep it as an
> option for people to switch between both behaviours. I know having a
> bug-fix property is not great but we can treat it as a backward
> compatibility property instead in order to keep old mirrors using the old
> internal topics.
>
> Hope this is reasonable for the time being.
>
> Cheers,
> Omnia
>
> On Wed, Jul 19, 2023 at 11:16 PM Chris Egerton 
> wrote:
>
> > Hi Federico,
> >
> > Ah yes, sorry about that! You're correct that this would keep the two
> > classes in line and largely eliminate the concern I posed about porting
> > changes to both. Still, I'm a bit hesitant, and I'm not actually certain
> > that this alternative is more intuitive. The name isn't very descriptive,
> > and this is the kind of approach we can only really take once; if we
> break
> > compatibility again, would we introduce a LegacyLegacyReplicationPolicy?
> > LegacyReplicationPolicy2? Finally, it seems a bit strange to introduce a
> > new class to implement a change in behavior this small.
> >
> > That said, I don't think this is worth blocking on and wouldn't be
> opposed
> > if others felt strongly that a new replication policy class is superior
> to
> > a new property on the existing class.
> >
> > Cheers,
> >
> > Chris
> >
> > On Wed, Jul 19, 2023 at 2:53 PM Federico Valeri 
> > wrote:
> >
> > > Hi Chris, the KIP says it would be a subclass of
> DefaultReplicationPolicy
> > > that overrides the ReplicationPolicy.offsetSyncsTopic and
> > > ReplicationPolicy.checkpointsTopic. So, not much to maintain and it
> would
> > > be more intuitive, as you say.
> > >
> > > On Wed, Jul 19, 2023, 4:50 PM Chris Egerton 
> > > wrote:
> > >
> > > > HI all,
> > > >
> > > > I'm not sure I understand the benefits of introducing a separate
> > > > replication policy class, besides maybe being more readable/intuitive
> > to
> > > > users who would want to know when to use one or the other. It feels
> > like
> > > > we've swapped out a "fix the bug" property for an entire "fix the
> bug"
> > > > class, and it still leaves the problem of graceful migration from
> > legacy
> > > > behavior to new behavior unsolved. It would also require us to
> consider
> > > > whether any future changes we make to the DefaultReplicationPolicy
> > class
> > > > would have to be ported over to the LegacyReplicationPolicy class as
> > > well.
> > > >
> > > > Perhaps I'm missing something; are there other benefits of
> introducing
> > a
> > > > separate replication policy class?
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Wed, Jul 19, 2023 at 5:45 AM Omnia Ibrahim <
> o.g.h.ibra...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Federico,
> > > > > I like the idea of implementing `LegacyReplicationPolicy` and
> > avoiding
> > > > bug
> > > > > fixes properties. We can drop the idea of the property and just go
> > > ahead
> > > > > with introducing the `LegacyReplicationPolicy` and any user upgrade
> > > from
> > > > > pre-KIP-690 can use this policy instead of default and no impact
> will
> > > > > happen to users upgrading from 3.x to post-KIP-949. We can mark
> > > > > `LegacyReplicationPolicy` as deprecated later if we want (but not
> in
> > > 4.0
> > > > as
> > > > > this is very soon) and we can drop it entirely at some point.
> > > > >
> > > > > If there's an agreement on this approach I can upgrade the KIP.
> > > > >
> > > > > Cheers.
> > > > > Omnia
> > > > >
> > > > > On Wed, Jul 19, 2023 at 8:52 AM Federico Valeri <
> > fedeval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi, one way to avoid the "fix the bug property" would be to
> provide
> > > > > > and document an additional LegacyReplicationPolicy, but still
> > keeping
> > > > > > the current DefaultReplicationPolicy as replication.policy.class
> > > > > > default value, which is basically one of the workarounds
> suggested
> > in
> > > > > > the KIP.
> > > > > >
> > > > > > On Tue, Jul 18, 2023 at 9:49 PM Chris Egerton
> > >  > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi Federico/Omnia,
> > > > > > >
> > > > > > > Generally I like the idea of deprecating and eventually
> removing
> > > "fix
> > > > > the
> > > > > > > bug" properties like this, but 4.0 may be a bit soon. I'm also
> > > unsure
> > > > > of
> > > > > > > how we would instruct users who are relying on the property
> being
> > > set

Re: [DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-21 Thread Omnia Ibrahim
Hi Chris and Federico,
thinking about I think Chris's concern is valid and the bigger concern here
is that having this `LegacyReplicationPolicy` will eventually open the door
for diversion at some point between this `LegacyReplicationPolicy` and the
default one.
For now, let's have the flag properly fix this bug and we can keep it as an
option for people to switch between both behaviours. I know having a
bug-fix property is not great but we can treat it as a backward
compatibility property instead in order to keep old mirrors using the old
internal topics.

Hope this is reasonable for the time being.

Cheers,
Omnia

On Wed, Jul 19, 2023 at 11:16 PM Chris Egerton 
wrote:

> Hi Federico,
>
> Ah yes, sorry about that! You're correct that this would keep the two
> classes in line and largely eliminate the concern I posed about porting
> changes to both. Still, I'm a bit hesitant, and I'm not actually certain
> that this alternative is more intuitive. The name isn't very descriptive,
> and this is the kind of approach we can only really take once; if we break
> compatibility again, would we introduce a LegacyLegacyReplicationPolicy?
> LegacyReplicationPolicy2? Finally, it seems a bit strange to introduce a
> new class to implement a change in behavior this small.
>
> That said, I don't think this is worth blocking on and wouldn't be opposed
> if others felt strongly that a new replication policy class is superior to
> a new property on the existing class.
>
> Cheers,
>
> Chris
>
> On Wed, Jul 19, 2023 at 2:53 PM Federico Valeri 
> wrote:
>
> > Hi Chris, the KIP says it would be a subclass of DefaultReplicationPolicy
> > that overrides the ReplicationPolicy.offsetSyncsTopic and
> > ReplicationPolicy.checkpointsTopic. So, not much to maintain and it would
> > be more intuitive, as you say.
> >
> > On Wed, Jul 19, 2023, 4:50 PM Chris Egerton 
> > wrote:
> >
> > > HI all,
> > >
> > > I'm not sure I understand the benefits of introducing a separate
> > > replication policy class, besides maybe being more readable/intuitive
> to
> > > users who would want to know when to use one or the other. It feels
> like
> > > we've swapped out a "fix the bug" property for an entire "fix the bug"
> > > class, and it still leaves the problem of graceful migration from
> legacy
> > > behavior to new behavior unsolved. It would also require us to consider
> > > whether any future changes we make to the DefaultReplicationPolicy
> class
> > > would have to be ported over to the LegacyReplicationPolicy class as
> > well.
> > >
> > > Perhaps I'm missing something; are there other benefits of introducing
> a
> > > separate replication policy class?
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Jul 19, 2023 at 5:45 AM Omnia Ibrahim  >
> > > wrote:
> > >
> > > > Hi Federico,
> > > > I like the idea of implementing `LegacyReplicationPolicy` and
> avoiding
> > > bug
> > > > fixes properties. We can drop the idea of the property and just go
> > ahead
> > > > with introducing the `LegacyReplicationPolicy` and any user upgrade
> > from
> > > > pre-KIP-690 can use this policy instead of default and no impact will
> > > > happen to users upgrading from 3.x to post-KIP-949. We can mark
> > > > `LegacyReplicationPolicy` as deprecated later if we want (but not in
> > 4.0
> > > as
> > > > this is very soon) and we can drop it entirely at some point.
> > > >
> > > > If there's an agreement on this approach I can upgrade the KIP.
> > > >
> > > > Cheers.
> > > > Omnia
> > > >
> > > > On Wed, Jul 19, 2023 at 8:52 AM Federico Valeri <
> fedeval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi, one way to avoid the "fix the bug property" would be to provide
> > > > > and document an additional LegacyReplicationPolicy, but still
> keeping
> > > > > the current DefaultReplicationPolicy as replication.policy.class
> > > > > default value, which is basically one of the workarounds suggested
> in
> > > > > the KIP.
> > > > >
> > > > > On Tue, Jul 18, 2023 at 9:49 PM Chris Egerton
> >  > > >
> > > > > wrote:
> > > > > >
> > > > > > Hi Federico/Omnia,
> > > > > >
> > > > > > Generally I like the idea of deprecating and eventually removing
> > "fix
> > > > the
> > > > > > bug" properties like this, but 4.0 may be a bit soon. I'm also
> > unsure
> > > > of
> > > > > > how we would instruct users who are relying on the property being
> > set
> > > > to
> > > > > > "false" to migrate to a version of MM2 that doesn't support
> support
> > > it,
> > > > > > beyond simply implementing their own replication policy--at which
> > > > point,
> > > > > > would we really be doing anyone a favor by forcing them to fork
> the
> > > > > default
> > > > > > policy just to preserve a property we removed?
> > > > > >
> > > > > > I guess right now I'd rather play it safe and not immediately
> > > deprecate
> > > > > the
> > > > > > property. If we can find an easy migration path for users who are
> > > > relying
> > > > > > on it, then I'd be happy to deprecate and 

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Colt McNealy
Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:

> Order is not guaranteed as bytes lexicographical ordering might not
represent key order.

no longer correct, and should say:

> Order guarantees depend on the underlying implementation of the
ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman 
wrote:

> Hey Almog, first off, thanks for the KIP! I (and others) raised concerns
> over how restrictive the default.dsl.store config would be if not
> extendable to custom store types, especially given that this seems to be
> the primary userbase of such a feature. At the time we didn't really have
> any better ideas for a clean way to achieve that, but what you proposed
> makes a lot of sense to me. Happy to see a good solution to this, and
> hopefully others will share my satisfaction :P
>
> I did have one quick piece of feedback which arose from an unrelated
> question posed to the dev mailing list w/ subject line
> "ReadOnlyKeyValueStore#range()
> Semantics"
> . I
> recommend checking out the full thread for context, but it made me think
> about how we can leverage the new StoreTypeSpec concept as an answer to the
> long-standing question in Streams: where can we put guarantees of the
> public contract for RocksDB (or other store implementations) when all the
> RocksDB stuff is technically internal.
>
> Basically, I'm suggesting two things: first, call out in some way (perhaps
> the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a public
> contract in itself and should outline any semantic guarantees it does, or
> does not, make. Second, we should add a note on ordering guarantees in the
> two OOTB specs: for RocksDB we assert that range queries will honor
> serialized byte ordering, whereas the InMemory flavor gives no ordering
> guarantee whatsoever at this time.
>
> Thoughts?
>
> -Sophie
>
> On Thu, Jul 20, 2023 at 4:28 PM Almog Gavra  wrote:
>
> > Hi All,
> >
> > I would like to propose a KIP to expand support for default store types
> > (KIP-591) to encompass custom store implementations:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types
> >
> > Looking forward to your feedback!
> >
> > Cheers,
> > Almog
> >
>


[jira] [Created] (KAFKA-15231) Add ability to pause/resume Remote Log Manager tasks

2023-07-21 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15231:


 Summary: Add ability to pause/resume Remote Log Manager tasks 
 Key: KAFKA-15231
 URL: https://issues.apache.org/jira/browse/KAFKA-15231
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jorge Esteban Quilcate Otoya


Once Tiered Storage is enabled, there may be situations where needed to pause 
uploading tasks to a remote-tier. e.g. remote storage maintenance, 
troubleshooting, etc.

An RSM implementation may not be able to do this by itself without throwing 
exceptions, polluting the logs, etc.

Could we consider adding this ability to the Tiered Storage framework? Remote 
Log Manager seems like a good candidate place for this; though I'm wondering on 
how to expose it.

Would be interested to hear if this sounds like a good idea, and what options 
we have to include these.

We have been considering extending RLM tasks with a pause flag, and having an 
MBean to switch them on demand. Another option may be to extend the Kafka 
protocol to expose this – but seems much moved involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-21 Thread Mayank Shekhar Narula
Hi David

 01. My reasoning noted in the KIP is that CurrentLeader was first added in
version 12, so 12 is the least version where clients could get these
optimisations. So any client can now choose to implement this with version
12 of the protocol itself. If the version is bumped to X(say 16), then all
clients(including non Java ones) will need to implement all versions from
13-16 to get these optimisations. And since the scenario of leader changing
is not common, so bytes wasted on the network won't be much. What do you
think?

02. For ProduceRequest, same reasoning. And corrected the taggedVersion.

03. Regarding making rack ignorable, my decision was inspired by a similar
field in MetadataResponse. I must admit I am not clear, why was it made
ignorable there originally?


On Fri, Jul 21, 2023 at 11:14 AM David Jacot 
wrote:

> Hi Mayank,
>
> Thanks for the KIP. This is an interesting idea that I have been thinking
> about for a long time so I am happy to see it. The gain is smaller than I
> expected but still worth it in my opinion.
>
> 01. In the FetchResponse, what's the reason for using version `12+` for the
> new tagged field `NodeEndpoints`? Old clients won't use this field because
> they are not aware of it. It seems to me that we are wasting bytes on the
> wire by sending those unnecessarily. Is there a reason that I may have
> missed? Intuitively, I would have bumped the version of the FetchRequest to
> 16 and I would have populated and sent `NodeEndpoints` only if version 16
> is used by the client.
>
> 02. I have the very same question for the ProduceRequest's `CurrentLeader`
> and `NodeEndpoints` fields. Note that the `taggedVerions` of
> `NodeEndpoints` is incorrect.
>
> 03. For both, does the field `Rack` have to really be ignorable? That does
> not seem necessary to me but I may be wrong.
>
> Best,
> David
>
> On Fri, Jul 21, 2023 at 12:32 AM Crispin Bernier
>  wrote:
>
> > Benchmark numbers have been posted on the KIP, please review.
> >
> > On 2023/07/20 13:03:00 Mayank Shekhar Narula wrote:
> > > Jun
> > >
> > > Thanks for the feedback.
> > >
> > > Numbers to follow.
> > >
> > > If we don't plan to
> > > > bump up the FetchResponse version, we could just remove the reference
> > to
> > > > version 16.
> > >
> > > Fixed.
> > >
> > > On Thu, Jul 20, 2023 at 1:28 AM Jun Rao 
> > wrote:
> > >
> > > > Hi, Mayank,
> > > >
> > > > Thanks for the KIP. I agree with others that it would be useful to
> see
> > the
> > > > performance results. Otherwise, just a minor comment. If we don't
> plan
> > to
> > > > bump up the FetchResponse version, we could just remove the reference
> > to
> > > > version 16.
> > > >
> > > > Jun
> > > >
> > > > On Wed, Jul 19, 2023 at 2:31 PM Mayank Shekhar Narula <
> > > > mayanks.nar...@gmail.com> wrote:
> > > >
> > > > > Luke
> > > > >
> > > > > Thanks for the interest in the KIP.
> > > > >
> > > > > But what if the consumer was fetching from the follower?
> > > > >
> > > > > We already include `PreferredReadReplica` in the fetch response.
> > > > > > Should we put the node info of PreferredReadReplica under this
> > case,
> > > > > > instead of the leader's info?
> > > > > >
> > > > >
> > > > > PreferredReadReplica is the decided on the leader. Looking at the
> > Java
> > > > > client code, AbstractFetch::selectReadReplica, first fetch request
> > goes
> > > > to
> > > > > Leader of the partition -> Sends back PreferredReadReplica -> Next
> > fetch
> > > > > uses PreferredReadReplica. So as long as leader is available,
> > > > > PreferredReadReplica would be found in subsequent fetches.
> > > > >
> > > > > Also, under this case, should we include the leader's info in the
> > > > response?
> > > > >
> > > > >
> > > > > In this case, I think the follower would fail the fetch if it
> knows a
> > > > > different leader. If the follower knows a newer leader, it would
> > return
> > > > new
> > > > > leader information in the response, for the client to act on.
> > > > >
> > > > >
> > > > > Will we include the leader/node info in the response when having
> > > > > > `UNKNOWN_LEADER_EPOCH` error?
> > > > >
> > > > >
> > > > > My understanding is UNKNOWN_LEADER_EPOCH when a request from a
> client
> > > > has a
> > > > > newer epoch than the broker. So the client is already up to date on
> > new
> > > > > leader information, it's the broker that has the catching up to
> do. I
> > > > think
> > > > > there might be some optimisations to make sure the broker refreshes
> > its
> > > > > metadata quickly, so it can quickly recover to handle requests that
> > > > > previously returned UNKNOWN_LEADER_EPOCH. But this work is outside
> > the
> > > > > scope of this KIP, as for now this KIP focusses on client-side
> > > > > optimisations.
> > > > >
> > > > > Mayank
> > > > >
> > > > > On Tue, Jul 18, 2023 at 8:51 AM Luke Chen  wrote:
> > > > >
> > > > > > Hi Mayank,
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > Some questions:
> > > > > > 1. I can see most of 

[jira] [Created] (KAFKA-15230) ApiVersions data between controllers is not reliable

2023-07-21 Thread David Arthur (Jira)
David Arthur created KAFKA-15230:


 Summary: ApiVersions data between controllers is not reliable
 Key: KAFKA-15230
 URL: https://issues.apache.org/jira/browse/KAFKA-15230
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur


While testing ZK migrations, I noticed a case where the controller was not 
starting the migration due to the missing ApiVersions data from other 
controllers. This was unexpected because the quorum was running and the 
followers were replicating the metadata log as expected. After examining a heap 
dump of the leader, it was in fact the case that the ApiVersions map of 
NodeApiVersions was empty.

 

After further investigation and offline discussion with [~jsancio], we realized 
that after the initial leader election, the connection from the Raft leader to 
the followers will become idle and eventually timeout and close. This causes 
NetworkClient to purge the NodeApiVersions data for the closed connections.

 

There are two main side effects of this behavior: 

1) If migrations are not started within the idle timeout period (10 minutes, by 
default), then they will not be able to be started. After this timeout period, 
I was unable to restart the controllers in such a way that the leader had 
active connections with all followers.

2) Dynamically updating features, such as "metadata.version", is not guaranteed 
to be safe

 

There is a partial workaround for the migration issue. If we set "
connections.max.idle.ms" to -1, the Raft leader will never disconnect from the 
followers. However, if a follower restarts, the leader will not re-establish a 
connection.
 
The feature update issue has no safe workarounds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Request permission to contribute

2023-07-21 Thread Taras Ledkov
Hi, Kafka Team.

I'm following this wiki to request permission to contribute to Apache Kafka
https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals#KafkaImprovementProposals-GettingStarted

I'll propose custom SSL factory for Kafka Connect REST server 
[org.apache.kafka.connect.runtime.rest.RestServer].
Kafka connect REST server can be configured only with file based key stores in 
current implementation.

My wiki ID and jira ID are both: tledkov (tled...@apache.org)
Can I get permission please?

--
With best regards,
Taras Ledkov



[jira] [Created] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-07-21 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15229:
-

 Summary: Increase default value of 
task.shutdown.graceful.timeout.ms
 Key: KAFKA-15229
 URL: https://issues.apache.org/jira/browse/KAFKA-15229
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 

```
Graceful stop of task  failed.
```

In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-21 Thread Ismael Juma
Thanks for the update Crispin - very helpful to have actual performance
data. 2-5% for the default configuration is a bit on the low side for this
kind of proposal.

Ismael

On Thu, Jul 20, 2023 at 11:33 PM Crispin Bernier
 wrote:

> Benchmark numbers have been posted on the KIP, please review.
>
> On 2023/07/20 13:03:00 Mayank Shekhar Narula wrote:
> > Jun
> >
> > Thanks for the feedback.
> >
> > Numbers to follow.
> >
> > If we don't plan to
> > > bump up the FetchResponse version, we could just remove the reference
> to
> > > version 16.
> >
> > Fixed.
> >
> > On Thu, Jul 20, 2023 at 1:28 AM Jun Rao 
> wrote:
> >
> > > Hi, Mayank,
> > >
> > > Thanks for the KIP. I agree with others that it would be useful to see
> the
> > > performance results. Otherwise, just a minor comment. If we don't plan
> to
> > > bump up the FetchResponse version, we could just remove the reference
> to
> > > version 16.
> > >
> > > Jun
> > >
> > > On Wed, Jul 19, 2023 at 2:31 PM Mayank Shekhar Narula <
> > > mayanks.nar...@gmail.com> wrote:
> > >
> > > > Luke
> > > >
> > > > Thanks for the interest in the KIP.
> > > >
> > > > But what if the consumer was fetching from the follower?
> > > >
> > > > We already include `PreferredReadReplica` in the fetch response.
> > > > > Should we put the node info of PreferredReadReplica under this
> case,
> > > > > instead of the leader's info?
> > > > >
> > > >
> > > > PreferredReadReplica is the decided on the leader. Looking at the
> Java
> > > > client code, AbstractFetch::selectReadReplica, first fetch request
> goes
> > > to
> > > > Leader of the partition -> Sends back PreferredReadReplica -> Next
> fetch
> > > > uses PreferredReadReplica. So as long as leader is available,
> > > > PreferredReadReplica would be found in subsequent fetches.
> > > >
> > > > Also, under this case, should we include the leader's info in the
> > > response?
> > > >
> > > >
> > > > In this case, I think the follower would fail the fetch if it knows a
> > > > different leader. If the follower knows a newer leader, it would
> return
> > > new
> > > > leader information in the response, for the client to act on.
> > > >
> > > >
> > > > Will we include the leader/node info in the response when having
> > > > > `UNKNOWN_LEADER_EPOCH` error?
> > > >
> > > >
> > > > My understanding is UNKNOWN_LEADER_EPOCH when a request from a client
> > > has a
> > > > newer epoch than the broker. So the client is already up to date on
> new
> > > > leader information, it's the broker that has the catching up to do. I
> > > think
> > > > there might be some optimisations to make sure the broker refreshes
> its
> > > > metadata quickly, so it can quickly recover to handle requests that
> > > > previously returned UNKNOWN_LEADER_EPOCH. But this work is outside
> the
> > > > scope of this KIP, as for now this KIP focusses on client-side
> > > > optimisations.
> > > >
> > > > Mayank
> > > >
> > > > On Tue, Jul 18, 2023 at 8:51 AM Luke Chen  wrote:
> > > >
> > > > > Hi Mayank,
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > Some questions:
> > > > > 1. I can see most of the cases we only care about consumer fetch
> from
> > > the
> > > > > leader.
> > > > > But what if the consumer was fetching from the follower?
> > > > > We already include `PreferredReadReplica` in the fetch response.
> > > > > Should we put the node info of PreferredReadReplica under this
> case,
> > > > > instead of the leader's info?
> > > > > Also, under this case, should we include the leader's info in the
> > > > response?
> > > > >
> > > > > 2. Will we include the leader/node info in the response when having
> > > > > `UNKNOWN_LEADER_EPOCH` error?
> > > > > I think it's fine we ignore the `UNKNOWN_LEADER_EPOCH` error since
> when
> > > > > this happens, the node might have some error which should refresh
> the
> > > > > metadata. On the other hand, it might also be good if we can heal
> the
> > > > node
> > > > > soon to do produce/consume works.
> > > > >
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Tue, Jul 18, 2023 at 2:00 AM Philip Nee 
> > > wrote:
> > > > >
> > > > > > Hey Mayank:
> > > > > >
> > > > > > For #1: I think fetch and produce behave a bit differently on
> > > metadata.
> > > > > > Maybe it is worth highlighting the changes for each client in
> detail.
> > > > In
> > > > > > producer did you mean by the metadata timeout before sending out
> > > > produce
> > > > > > requests? For consumer: I think for fetches it requires user to
> retry
> > > > if
> > > > > > the position does not exist on the leader. I don't have the
> detail on
> > > > top
> > > > > > of my head, but I think we should lay out these behavioral
> changes.
> > > > > >
> > > > > > For #3: Thanks for the clarification.
> > > > > >
> > > > > > On Mon, Jul 17, 2023 at 10:39 AM Mayank Shekhar Narula <
> > > > > > mayanks.nar...@gmail.com> wrote:
> > > > > >
> > > > > > > Philip
> > > > > > >
> > > > > > > 1. Good call out about "poll" behaviour, 

Re: Re: [DISCUSS] KIP-951: Leader discovery optimisations for the client

2023-07-21 Thread David Jacot
Hi Mayank,

Thanks for the KIP. This is an interesting idea that I have been thinking
about for a long time so I am happy to see it. The gain is smaller than I
expected but still worth it in my opinion.

01. In the FetchResponse, what's the reason for using version `12+` for the
new tagged field `NodeEndpoints`? Old clients won't use this field because
they are not aware of it. It seems to me that we are wasting bytes on the
wire by sending those unnecessarily. Is there a reason that I may have
missed? Intuitively, I would have bumped the version of the FetchRequest to
16 and I would have populated and sent `NodeEndpoints` only if version 16
is used by the client.

02. I have the very same question for the ProduceRequest's `CurrentLeader`
and `NodeEndpoints` fields. Note that the `taggedVerions` of
`NodeEndpoints` is incorrect.

03. For both, does the field `Rack` have to really be ignorable? That does
not seem necessary to me but I may be wrong.

Best,
David

On Fri, Jul 21, 2023 at 12:32 AM Crispin Bernier
 wrote:

> Benchmark numbers have been posted on the KIP, please review.
>
> On 2023/07/20 13:03:00 Mayank Shekhar Narula wrote:
> > Jun
> >
> > Thanks for the feedback.
> >
> > Numbers to follow.
> >
> > If we don't plan to
> > > bump up the FetchResponse version, we could just remove the reference
> to
> > > version 16.
> >
> > Fixed.
> >
> > On Thu, Jul 20, 2023 at 1:28 AM Jun Rao 
> wrote:
> >
> > > Hi, Mayank,
> > >
> > > Thanks for the KIP. I agree with others that it would be useful to see
> the
> > > performance results. Otherwise, just a minor comment. If we don't plan
> to
> > > bump up the FetchResponse version, we could just remove the reference
> to
> > > version 16.
> > >
> > > Jun
> > >
> > > On Wed, Jul 19, 2023 at 2:31 PM Mayank Shekhar Narula <
> > > mayanks.nar...@gmail.com> wrote:
> > >
> > > > Luke
> > > >
> > > > Thanks for the interest in the KIP.
> > > >
> > > > But what if the consumer was fetching from the follower?
> > > >
> > > > We already include `PreferredReadReplica` in the fetch response.
> > > > > Should we put the node info of PreferredReadReplica under this
> case,
> > > > > instead of the leader's info?
> > > > >
> > > >
> > > > PreferredReadReplica is the decided on the leader. Looking at the
> Java
> > > > client code, AbstractFetch::selectReadReplica, first fetch request
> goes
> > > to
> > > > Leader of the partition -> Sends back PreferredReadReplica -> Next
> fetch
> > > > uses PreferredReadReplica. So as long as leader is available,
> > > > PreferredReadReplica would be found in subsequent fetches.
> > > >
> > > > Also, under this case, should we include the leader's info in the
> > > response?
> > > >
> > > >
> > > > In this case, I think the follower would fail the fetch if it knows a
> > > > different leader. If the follower knows a newer leader, it would
> return
> > > new
> > > > leader information in the response, for the client to act on.
> > > >
> > > >
> > > > Will we include the leader/node info in the response when having
> > > > > `UNKNOWN_LEADER_EPOCH` error?
> > > >
> > > >
> > > > My understanding is UNKNOWN_LEADER_EPOCH when a request from a client
> > > has a
> > > > newer epoch than the broker. So the client is already up to date on
> new
> > > > leader information, it's the broker that has the catching up to do. I
> > > think
> > > > there might be some optimisations to make sure the broker refreshes
> its
> > > > metadata quickly, so it can quickly recover to handle requests that
> > > > previously returned UNKNOWN_LEADER_EPOCH. But this work is outside
> the
> > > > scope of this KIP, as for now this KIP focusses on client-side
> > > > optimisations.
> > > >
> > > > Mayank
> > > >
> > > > On Tue, Jul 18, 2023 at 8:51 AM Luke Chen  wrote:
> > > >
> > > > > Hi Mayank,
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > Some questions:
> > > > > 1. I can see most of the cases we only care about consumer fetch
> from
> > > the
> > > > > leader.
> > > > > But what if the consumer was fetching from the follower?
> > > > > We already include `PreferredReadReplica` in the fetch response.
> > > > > Should we put the node info of PreferredReadReplica under this
> case,
> > > > > instead of the leader's info?
> > > > > Also, under this case, should we include the leader's info in the
> > > > response?
> > > > >
> > > > > 2. Will we include the leader/node info in the response when having
> > > > > `UNKNOWN_LEADER_EPOCH` error?
> > > > > I think it's fine we ignore the `UNKNOWN_LEADER_EPOCH` error since
> when
> > > > > this happens, the node might have some error which should refresh
> the
> > > > > metadata. On the other hand, it might also be good if we can heal
> the
> > > > node
> > > > > soon to do produce/consume works.
> > > > >
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Tue, Jul 18, 2023 at 2:00 AM Philip Nee 
> > > wrote:
> > > > >
> > > > > > Hey Mayank:
> > > > > >
> > > > > > For #1: I think fetch and 

Re: [DISCUSS] KIP-943: Add independent "offset.storage.segment.bytes" for connect-distributed.properties

2023-07-21 Thread Sagar
Hey Hudeqi,

Thanks for the KIP! After reading the KIP and the comments by Yash and Greg
I agree with these aspects:

1) While I agree that having a high value for segment.btes config can lead
to higher startup times, we don't necessarily need to expose a separate
config for it(as Yash suggested). We can use the same mechanism as exposed
by KIP-605 to set it to a lower value specifically for Connect. IMO this
could be an internal config as well defined via ConfigDef#defineInternal so
they don't need to show up in the docs as well. I haven't tested it but if
the users do happen to override the config via the KIP-605 mechanism, it
should update. So, the scope of the KIP could be reduced to having an
explicit internal config for offset's topic segment.bytes with a lower
default value. WDYT?

2) I don't think we should let the configs of existing topics be updated.
If you notice both KIP-605 and KIP-154 (the one which 605 cites) don't
allow updating the configs of existing topics. It would be a good idea to
stick around with this practice imo.

3) Regarding the default value of 50 MB, tbh I am not totally aware of how
the default values for these configs were chosen in the past. But as
pointed out by Greg, __consumer_offsets topic could be a good example to
follow and a default value of 100MB could be a good starting point. Or if
needed we can be defensive and start with a slightly higher value like
250MB. Also the point about tombstone records leading to inconsistent
in-memory states across multiple workers is a good one. This happens with
status topic as well IIRC and if possible we should look to fix it. That is
outside the scope of the KIP though.

Thanks!
Sagar.


On Fri, Jul 14, 2023 at 1:49 AM Greg Harris 
wrote:

> Hey hudeqi,
>
> Thanks for the KIP! I did not know about the existing segment.bytes
> default value, and it does seem rather high in the context of the
> Connect internal topics.
> If I think about the segment.size as a "minimum per-partition data
> transfer on startup", 1GB is certainly not appropriate for even the
> single-partition config topic.
>
> 1. I have a concern about changing the topic configuration on startup.
>
> In the existing codebase, the *.storage.* worker configurations appear
> to only have an effect for newly created topics. If the topics were
> manually created before a Connect cluster starts, or were created by a
> previous Connect instance, then the Connect worker configuration could
> have arbitrary contents that have no effect. Updating the topic
> configurations after creation would be a new capability.
> Consider the situation where someone were to notice this log.segment
> problem, where a natural response would be to reconfigure the topic,
> diverging from the two configurations. When the worker can change the
> topic configuration after creation, that has the potential to roll
> back topic configurations that are managed externally.
> Do you think that changing the default for new Connect clusters, and
> emitting a startup warning for excessive segment.bytes is reasonable?
> We have other startup assertions that fail the startup of a worker
> based on partition and compaction requirements, and this would be
> similar in that it alerts the user to reconfigure the internal topics,
> but with a lesser severity.
>
> 2. I'm also interested to know what a reasonable value for this
> configuration would be. I did find the __consumer_offsets topic uses
> 104857600 (100 MiB) as defined in OffsetConfig.scala, so there is
> precedent for having a smaller segment.size for internal topics.
>
> 3. I believe there's a potential bug where compaction can happen
> before a worker reads a tombstone, leading the KafkaBasedLog to
> produce inconsistent in-memory states across multiple workers. Since
> the segment.size is so large, it makes me think that compaction has
> been wholly ineffective so far, and has prevented this bug from
> manifesting. By lowering the segment.size, we're increasing the
> likelihood of this failure, so it may need to finally be addressed.
>
> Thanks,
> Greg
>
>
>
>
> On Thu, Jul 6, 2023 at 5:39 AM Yash Mayya  wrote:
> >
> > Also, just adding to the above point - we don't necessarily need to
> > explicitly add a new worker configuration right? Instead, we could
> > potentially just use the new proposed default value internally which can
> be
> > overridden by users through setting a value for
> > "offset.storage.segment.bytes" (via the existing KIP-605 based
> mechanism).
> >
> > On Thu, Jul 6, 2023 at 6:04 PM Yash Mayya  wrote:
> >
> > > Hi hudeqi,
> > >
> > > Thanks for the KIP! Just to clarify - since KIP-605 (
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-605%3A+Expand+Connect+Worker+Internal+Topic+Settings
> )
> > > already allows configuring "segment.bytes" for the Connect cluster's
> > > offsets topic via a worker configuration
> ("offset.storage.segment.bytes",
> > > same as what is being proposed in this KIP), the primary 

[jira] [Reopened] (KAFKA-14581) Move GetOffsetShell to tools

2023-07-21 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri reopened KAFKA-14581:
-

> Move GetOffsetShell to tools
> 
>
> Key: KAFKA-14581
> URL: https://issues.apache.org/jira/browse/KAFKA-14581
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Ruslan Krivoshein
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15222.
--
Fix Version/s: 3.6.0
   Resolution: Fixed

> Upgrade zinc scala incremental compiler plugin version to a latests stable 
> fit version (1.9.2)
> --
>
> Key: KAFKA-15222
> URL: https://issues.apache.org/jira/browse/KAFKA-15222
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, tools
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Minor
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)