Re: [ANNOUNCE] Apache Kafka 3.7.0

2024-02-29 Thread Guozhang Wang
Thanks Stan for running the release!

On Thu, Feb 29, 2024 at 5:39 AM Boudjelda Mohamed Said
 wrote:
>
> Thanks Stanislav for running the release!
>
> On Wed, Feb 28, 2024 at 10:36 PM Kirk True  wrote:
>
> > Thanks Stanislav
> >
> > > On Feb 27, 2024, at 10:01 AM, Stanislav Kozlovski <
> > stanislavkozlov...@apache.org> wrote:
> > >
> > > The Apache Kafka community is pleased to announce the release of
> > > Apache Kafka 3.7.0
> > >
> > > This is a minor release that includes new features, fixes, and
> > > improvements from 296 JIRAs
> > >
> > > An overview of the release and its notable changes can be found in the
> > > release blog post:
> > > https://kafka.apache.org/blog#apache_kafka_370_release_announcement
> > >
> > > All of the changes in this release can be found in the release notes:
> > > https://www.apache.org/dist/kafka/3.7.0/RELEASE_NOTES.html
> > >
> > > You can download the source and binary release (Scala 2.12, 2.13) from:
> > > https://kafka.apache.org/downloads#3.7.0
> > >
> > >
> > ---
> > >
> > >
> > > Apache Kafka is a distributed streaming platform with four core APIs:
> > >
> > >
> > > ** The Producer API allows an application to publish a stream of records
> > to
> > > one or more Kafka topics.
> > >
> > > ** The Consumer API allows an application to subscribe to one or more
> > > topics and process the stream of records produced to them.
> > >
> > > ** The Streams API allows an application to act as a stream processor,
> > > consuming an input stream from one or more topics and producing an
> > > output stream to one or more output topics, effectively transforming the
> > > input streams to output streams.
> > >
> > > ** The Connector API allows building and running reusable producers or
> > > consumers that connect Kafka topics to existing applications or data
> > > systems. For example, a connector to a relational database might
> > > capture every change to a table.
> > >
> > >
> > > With these APIs, Kafka can be used for two broad classes of application:
> > >
> > > ** Building real-time streaming data pipelines that reliably get data
> > > between systems or applications.
> > >
> > > ** Building real-time streaming applications that transform or react
> > > to the streams of data.
> > >
> > >
> > > Apache Kafka is in use at large and small companies worldwide, including
> > > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> > > Target, The New York Times, Uber, Yelp, and Zalando, among others.
> > >
> > > A big thank you to the following 146 contributors to this release!
> > > (Please report an unintended omission)
> > >
> > > Abhijeet Kumar, Akhilesh Chaganti, Alieh, Alieh Saeedi, Almog Gavra,
> > > Alok Thatikunta, Alyssa Huang, Aman Singh, Andras Katona, Andrew
> > > Schofield, Anna Sophie Blee-Goldman, Anton Agestam, Apoorv Mittal,
> > > Arnout Engelen, Arpit Goyal, Artem Livshits, Ashwin Pankaj,
> > > ashwinpankaj, atu-sharm, bachmanity1, Bob Barrett, Bruno Cadonna,
> > > Calvin Liu, Cerchie, chern, Chris Egerton, Christo Lolov, Colin
> > > Patrick McCabe, Colt McNealy, Crispin Bernier, David Arthur, David
> > > Jacot, David Mao, Deqi Hu, Dimitar Dimitrov, Divij Vaidya, Dongnuo
> > > Lyu, Eaugene Thomas, Eduwer Camacaro, Eike Thaden, Federico Valeri,
> > > Florin Akermann, Gantigmaa Selenge, Gaurav Narula, gongzhongqiang,
> > > Greg Harris, Guozhang Wang, Gyeongwon, Do, Hailey Ni, Hanyu Zheng, Hao
> > > Li, Hector Geraldino, hudeqi, Ian McDonald, Iblis Lin, Igor Soarez,
> > > iit2009060, Ismael Juma, Jakub Scholz, James Cheng, Jason Gustafson,
> > > Jay Wang, Jeff Kim, Jim Galasyn, John Roesler, Jorge Esteban Quilcate
> > > Otoya, Josep Prat, José Armando García Sancio, Jotaniya Jeel, Jouni
> > > Tenhunen, Jun Rao, Justine Olshan, Kamal Chandraprakash, Kirk True,
> > > kpatelatwork, kumarpritam863, Laglangyue, Levani Kokhreidze, Lianet
> > > Magrans, Liu Zeyu, Lucas Brutschy, Lucia Cerchie, Luke Chen, maniekes,
> > > Manikumar Reddy, mannoopj, Maros Orsak, Matthew de Detrich, Matthias
> > > J. Sax, Max Riedel, Mayank Shekhar Narula, Mehari Beyene, Michael
> > > Westerby, Mickael Maison, Nick Telford, Nikhi

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-12-24 Thread Guozhang Wang
Thanks Almog, I read https://github.com/apache/kafka/pull/15061/files
and I think the additional API is the right way of fixing it.


Guozhang

On Fri, Dec 22, 2023 at 9:14 AM Almog Gavra  wrote:
>
> Hello Everyone! I updated the KIP once more as a result of a bug
> investigation - I added DslWindowParams#isTimestamped to the public API as
> a result of https://issues.apache.org/jira/browse/KAFKA-16046. Please let
> me know if there's any concerns with this addition.
>
> On Thu, Dec 14, 2023 at 5:40 PM Almog Gavra  wrote:
>
> > Sorry for the late response to the late reply, hah! I didn't give any
> > thought about how we would want to integrate this custom store supplier
> > with querying of the stores. My initial intuition suggests that we'd
> > probably want a separate API for that, or just recommend people to query
> > their external stores outside of the context of Kafka Streams (with the
> > understanding that there are fewer semantic guarantees).
> >
> > On Sat, Dec 2, 2023 at 9:38 AM Guozhang Wang 
> > wrote:
> >
> >> Hey Almog,
> >>
> >> Sorry for the late reply.
> >>
> >> Re: 2) above, maybe I'm just overthinking it. What I had in mind is
> >> that when we have, say, a remote store impl customized by the users.
> >> Besides being used inside the KS app itself, the user may try to
> >> access the store instance outside the KS app as well? If that's the
> >> case, maybe it's still worth having an interface from KS to expose the
> >> store instance directly.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Sun, Nov 19, 2023 at 5:26 PM Almog Gavra 
> >> wrote:
> >> >
> >> > Hello Guozhang,
> >> >
> >> > Thanks for the feedback! For 1 there are tests verifying this and I did
> >> so
> >> > manually as well, it does not reveal anything about the store types --
> >> just
> >> > the names, so I think we're good there. I've put an example at the
> >> bottom
> >> > of this reply for people following the conversation.
> >> >
> >> > I'm not sure I understand your question about 2. What's the integration
> >> > point with the actual store for this external component? What does that
> >> > have to do with this PR/how does it differ from what's available today
> >> > (with the default.dsl.store configuration)? In either scenario, getting
> >> the
> >> > actual instantiated store supplier must be done only after the topology
> >> is
> >> > built and rewritten (it can be passed in either via
> >> > Materialized/StreamJoined in the DSL code, via TopologyConfig overrides
> >> or
> >> > in the global StreamsConfig passed in to KafkaStreams). Today, AFAIK,
> >> this
> >> > isn't possible (you can't get from the built topology the instantiated
> >> > store supplier).
> >> >
> >> > Thanks,
> >> > Almog
> >> >
> >> > 
> >> >
> >> > Topologies:
> >> >Sub-topology: 0
> >> > Source: KSTREAM-SOURCE-00 (topics: [test_topic])
> >> >   --> KSTREAM-TRANSFORMVALUES-01
> >> > Processor: KSTREAM-TRANSFORMVALUES-01 (stores: [])
> >> >   --> Aggregate-Prepare
> >> >   <-- KSTREAM-SOURCE-00
> >> > Processor: Aggregate-Prepare (stores: [])
> >> >   --> KSTREAM-AGGREGATE-03
> >> >   <-- KSTREAM-TRANSFORMVALUES-01
> >> > Processor: KSTREAM-AGGREGATE-000003 (stores:
> >> > [Aggregate-Aggregate-Materialize])
> >> >   --> Aggregate-Aggregate-ToOutputSchema
> >> >   <-- Aggregate-Prepare
> >> > Processor: Aggregate-Aggregate-ToOutputSchema (stores: [])
> >> >   --> Aggregate-Project
> >> >   <-- KSTREAM-AGGREGATE-03
> >> > Processor: Aggregate-Project (stores: [])
> >> >   --> KTABLE-TOSTREAM-06
> >> >   <-- Aggregate-Aggregate-ToOutputSchema
> >> > Processor: KTABLE-TOSTREAM-06 (stores: [])
> >> >   --> KSTREAM-SINK-07
> >> >   <-- Aggregate-Project
> >> > Sink: KSTREAM-SINK-07 (topic: S2)
> >> >   <-- KTABLE-TOSTREAM-06
> >> >
> >> > On Sat, Nov 18, 2023 at 6:05 PM Guozhang Wang <
> >> guozhang

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-12-02 Thread Guozhang Wang
Hey Almog,

Sorry for the late reply.

Re: 2) above, maybe I'm just overthinking it. What I had in mind is
that when we have, say, a remote store impl customized by the users.
Besides being used inside the KS app itself, the user may try to
access the store instance outside the KS app as well? If that's the
case, maybe it's still worth having an interface from KS to expose the
store instance directly.


Guozhang


On Sun, Nov 19, 2023 at 5:26 PM Almog Gavra  wrote:
>
> Hello Guozhang,
>
> Thanks for the feedback! For 1 there are tests verifying this and I did so
> manually as well, it does not reveal anything about the store types -- just
> the names, so I think we're good there. I've put an example at the bottom
> of this reply for people following the conversation.
>
> I'm not sure I understand your question about 2. What's the integration
> point with the actual store for this external component? What does that
> have to do with this PR/how does it differ from what's available today
> (with the default.dsl.store configuration)? In either scenario, getting the
> actual instantiated store supplier must be done only after the topology is
> built and rewritten (it can be passed in either via
> Materialized/StreamJoined in the DSL code, via TopologyConfig overrides or
> in the global StreamsConfig passed in to KafkaStreams). Today, AFAIK, this
> isn't possible (you can't get from the built topology the instantiated
> store supplier).
>
> Thanks,
> Almog
>
> 
>
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [test_topic])
>   --> KSTREAM-TRANSFORMVALUES-01
> Processor: KSTREAM-TRANSFORMVALUES-01 (stores: [])
>   --> Aggregate-Prepare
>   <-- KSTREAM-SOURCE-00
> Processor: Aggregate-Prepare (stores: [])
>   --> KSTREAM-AGGREGATE-03
>   <-- KSTREAM-TRANSFORMVALUES-01
> Processor: KSTREAM-AGGREGATE-03 (stores:
> [Aggregate-Aggregate-Materialize])
>   --> Aggregate-Aggregate-ToOutputSchema
>   <-- Aggregate-Prepare
> Processor: Aggregate-Aggregate-ToOutputSchema (stores: [])
>   --> Aggregate-Project
>   <-- KSTREAM-AGGREGATE-03
> Processor: Aggregate-Project (stores: [])
>   --> KTABLE-TOSTREAM-06
>   <-- Aggregate-Aggregate-ToOutputSchema
> Processor: KTABLE-TOSTREAM-06 (stores: [])
>   --> KSTREAM-SINK-000007
>   <-- Aggregate-Project
> Sink: KSTREAM-SINK-07 (topic: S2)
>   <-- KTABLE-TOSTREAM-06
>
> On Sat, Nov 18, 2023 at 6:05 PM Guozhang Wang 
> wrote:
>
> > Hello Almog,
> >
> > I left a comment in the PR before I got to read the newest updates
> > from this thread. My 2c:
> >
> > 1. I liked the idea of delaying the instantiation of StoreBuiler from
> > suppliers after the Topology is created. It has been a bit annoying
> > for many other features we were trying back then. The only thing is,
> > we need to check when we call Topology.describe() which gets a
> > TopologyDescription, does that reveal anything about the source of
> > truth store impl types already; if it does not, then we are good to
> > go.
> >
> > 2. I originally thought (and commented in the PR) that maybe we can
> > just add this new func "resolveDslStoreSuppliers" into StreamsConfig
> > directly and mark it as EVOLVING, because I was not clear that we are
> > trying to do 1) above. Now I'm leaning more towards what you proposed.
> > But I still have a question in mind: even after we've done
> > https://github.com/apache/kafka/pull/14548 later, don't we still need
> > some interface that user's can call to get the actual instantiated
> > store supplier for cases where some external custom logic, like an
> > external controller / scheduler which is developed by a different
> > group of people rather than the Streams app developers themselves,
> > that can only turn on certain features after learning the actual store
> > impl suppliers used?
> >
> > Guozhang
> >
> > On Sat, Nov 18, 2023 at 2:46 PM Almog Gavra  wrote:
> > >
> > > Hello Everyone - one more minor change to the KIP that came up during
> > > implementation (reflected in the KIP itself). I will be adding the method
> > > below to TopologyConfig. This allows us to determine whether or not the
> > > DslStoreSuppliers was explicitly passed in via either
> > > DSL_STORE_SUPPLIERS_CLASS_CONFIG or DEFAULT_DSL_STORE_CONFIG (if it was
> > not
> > > explicitly passed in, we will use the one that is configured in
> &g

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-11-18 Thread Guozhang Wang
; that introduced default.dsl.store!
> > >>>
> > >>> On Fri, Jul 28, 2023 at 4:55 PM Almog Gavra 
> > >> wrote:
> > >>>
> > >>>> OK! I think I got everything, but I'll give the KIP another read with
> > >>>> fresh eyes later. Just a reminder that the voting is open, so go out
> > and
> > >>>> exercise your civic duty! ;)
> > >>>>
> > >>>> - Almog
> > >>>>
> > >>>> On Fri, Jul 28, 2023 at 10:38 AM Almog Gavra 
> > >>>> wrote:
> > >>>>
> > >>>>> Thanks Guozhang & Sophie:
> > >>>>>
> > >>>>> A2. Will clarify in the KIP
> > >>>>> A3. Will change back to the deprecated version!
> > >>>>> A5. Seems like I'm outnumbered... DslStoreSuppliers it is.
> > >>>>>
> > >>>>> Will update the KIP today.
> > >>>>>
> > >>>>> - Almog
> > >>>>>
> > >>>>> On Thu, Jul 27, 2023 at 12:42 PM Guozhang Wang <
> > >>>>> guozhang.wang...@gmail.com> wrote:
> > >>>>>
> > >>>>>> Yes, that sounds right to me. Thanks Sophie.
> > >>>>>>
> > >>>>>> On Thu, Jul 27, 2023 at 12:35 PM Sophie Blee-Goldman
> > >>>>>>  wrote:
> > >>>>>>>
> > >>>>>>> A2: Guozhang, just to close the book on the ListValue store thing,
> > I
> > >>>>>> fully
> > >>>>>>> agree it seems like overreach
> > >>>>>>> to expose/force this on users, especially if it's fully internal
> > >>>>>> today. But
> > >>>>>>> just to make sure we're on the same
> > >>>>>>> page here, you're still ok with this KIP fixing the API gap that
> > >> exists
> > >>>>>>> today, in which these stores cannot be
> > >>>>>>> customized by the user at all?
> > >>>>>>>
> > >>>>>>> In other words, after this KIP, the new behavior for the ListValue
> > >>>>>> store in
> > >>>>>>> a stream join will be:
> > >>>>>>>
> > >>>>>>> S1: First, check if the user passed in a `DSLStoreSuppliers` (or
> > >>>>>> whatever
> > >>>>>>> the name will be) to the
> > >>>>>>>  StreamJoined config object, and use that to obtain the
> > >>>>>>> KVStoreSupplier for this ListValue store
> > >>>>>>>
> > >>>>>>> S2: If none was provided, check if the user has set a default
> > >>>>>>> DSLStoreSuppliers via the new config,
> > >>>>>>>  and use that to get the KVStoreSupplier if so
> > >>>>>>>
> > >>>>>>> S3: If neither is set, fall back to the original logic as it is
> > >> today,
> > >>>>>>> which is to pass in a KVStoreSupplier
> > >>>>>>>  that is hard-coded to be either RocksDB or InMemory,
> > based on
> > >>>>>> what
> > >>>>>>> is returned for the #persistent
> > >>>>>>>  API by the StreamJoined's WindowStoreSupplier
> > >>>>>>>
> > >>>>>>> Does that sound right? We can clarify this further in the KIP if
> > need
> > >>>>>> be
> > >>>>>>>
> > >>>>>>> On Thu, Jul 27, 2023 at 10:48 AM Guozhang Wang <
> > >>>>>> guozhang.wang...@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi all,
> > >>>>>>>>
> > >>>>>>>> Like Almog's secretary as well! Just following up on that index:
> > >>>>>>>>
> > >>>>>>>> A2: I'm also happy without introducing versioned KV in this KIP
> > as I
> > >>>>>>>> would envision it to be introduced as new params into the
> > >>>>>>>> KeyValuePluginParams in the future. And just to clarify on
&

Re: [VOTE] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-18 Thread Guozhang Wang
Thanks Alieh,

I read through the wiki page and the DISCUSS thread, all LGTM except a
minor thing in javadoc:

"The query returns the records with a global ascending order of keys.
The records with the same key are ordered based on their insertion
timestamp in ascending order. Both the global and partial ordering are
modifiable with the corresponding methods defined for the class."

Since this KIP is only for a single key, there's no key ordering but
only timestamp ordering right? Maybe the javadoc can be updated
accordingly.

Otherwise, LGTM.

On Fri, Nov 17, 2023 at 2:36 AM Alieh Saeedi
 wrote:
>
> Hi all,
> Following my recent message in the discussion thread, I am opening the
> voting for KIP-968. Thanks for your votes in advance.
>
> Cheers,
> Alieh


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-18 Thread Guozhang Wang
Hi Rohan,

I took another look at the updated wiki page and do not have any major
questions. Regarding returning a plugin object v.s. configuring a
plugin object, I do not have a strong opinion except that the latter
seems more consistent with existing patterns. Just curious, any other
motivations to go with the latter from you?


Guozhang

On Thu, Nov 9, 2023 at 11:19 PM Rohan Desai  wrote:
>
> Thanks for the feedback so far! I think pretty much all of it is
> reasonable. I'll reply to it inline:
>
> > 1. All the API logic is granular at the Task level, except the
> previousOwnerForPartition func. I’m not clear what’s the motivation behind
> it, does our controller also want to change how the partitions->tasks
> mapping is formed?
> You're right that this is out of place. I've removed this method as it's
> not needed by the task assignor.
>
> > 2. Just on the API layering itself: it feels a bit weird to have the
> three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
> ApplicationMetadata class. If we consider them as some default util
> functions, how about introducing moving those into their own static util
> methods to separate from the ApplicationMetadata “fact objects” ?
> Agreed. Updated in the latest revision of the kip. These have been moved to
> TaskAssignorUtils
>
> > 3. I personally prefer `NodeAssignment` to be a read-only object
> containing the decisions made by the assignor, including the
> requestFollowupRebalance flag. For manipulating the half-baked results
> inside the assignor itself, maybe we can just be flexible to let users use
> whatever struts / their own classes even, if they like. WDYT?
> Agreed. Updated in the latest version of the kip.
>
> > 1. For the API, thoughts on changing the method signature to return a
> (non-Optional) TaskAssignor? Then we can either have the default
> implementation return new HighAvailabilityTaskAssignor or just have a
> default implementation class that people can extend if they don't want to
> implement every method.
> Based on some other discussion, I actually decided to get rid of the plugin
> interface, and instead use config to specify individual plugin behaviour.
> So the method you're referring to is no longer part of the proposal.
>
> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> theres methods that return void on it? It's not totally clear to me how
> that interface is supposed to be used by the assignor. It'd be nice if we
> could flip that interface such that it becomes part of the output instead
> of an input to the plugin.
> I've moved those methods to a util class. They're really utility methods
> the assignor might want to call to do some default or optimized assignment
> for some cases like rack-awareness.
>
> > 4. We should consider wrapping UUID in a ProcessID class so that we
> control
> the interface (there are a few places where UUID is directly used).
> I like it. Updated the proposal.
>
> > 5. What does NodeState#newAssignmentForNode() do? I thought the point was
> for the plugin to make the assignment? Is that the result of the default
> logic?
> It doesn't need to be part of the interface. I've removed it.
>
> > re 2/6:
>
> I generally agree with these points, but I'd rather hash that out in a PR
> than in the KIP review, as it'll be clearer what gets used how. It seems to
> me (committers please correct me if I'm wrong) that as long as we're on the
> same page about what information the interfaces are returning, that's ok at
> this level of discussion.
>
> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai  wrote:
>
> > Hello All,
> >
> > I'd like to start a discussion on KIP-924 (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> > which proposes an interface to allow users to plug into the streams
> > partition assignor. The motivation section in the KIP goes into some more
> > detail on why we think this is a useful addition. Thanks in advance for
> > your feedback!
> >
> > Best Regards,
> >
> > Rohan
> >
> >


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-09 Thread Guozhang Wang
Hello Rohan,

Thanks for the KIP! Overall it looks very nice. Just some quick thoughts :

1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the motivation
behind it, does our controller also want to change how the
partitions->tasks mapping is formed?

2. Just on the API layering itself: it feels a bit weird to have the
three built-in functions (defaultStandbyTaskAssignment etc) sitting in
the ApplicationMetadata class. If we consider them as some default
util functions, how about introducing moving those into their own
static util methods to separate from the  ApplicationMetadata “fact
objects” ?

3. I personally prefer `NodeAssignment` to be a read-only object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users
use whatever struts / their own classes even, if they like. WDYT?

Thanks,
Guozhang

On Tue, Nov 7, 2023 at 12:04 PM Rohan Desai  wrote:
>
> Hello All,
>
> I'd like to start a discussion on KIP-924 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> which proposes an interface to allow users to plug into the streams
> partition assignor. The motivation section in the KIP goes into some more
> detail on why we think this is a useful addition. Thanks in advance for
> your feedback!
>
> Best Regards,
>
> Rohan


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-29 Thread Guozhang Wang
 this:
> >>>>
> >>>> 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> >>>> reads from DB.
> >>>> 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> >>>> WriteBatch/DB. Flush on error (see note below).
> >>>> 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
> >>>> reads from DB. Wipe state on error.
> >>>> 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
> >>>> WriteBatch/DB.
> >>>>
> >>>> I believe the feature is important enough that we will see good
> >>>> adoption even without changing the default. In 4.0, when we have seen
> >>>> this being adopted and is battle-tested, we make READ_COMMITTED the
> >>>> default for EOS, or even READ_COMITTED always the default, depending
> >>>> on our experiences. And we could add a clever implementation of
> >>>> READ_UNCOMITTED with WriteBatches later.
> >>>>
> >>>> The only smell here is that `default.state.isolation.level` wouldn't
> >>>> be purely an IQ setting, but it would also (slightly) change the
> >>>> behavior of the processing, but that seems unavoidable as long as we
> >>>> haven't solve READ_UNCOMITTED IQ with WriteBatches.
> >>>>
> >>>> Minor: As for Bruno's point 4, I think if we are concerned about this
> >>>> behavior (we don't necessarily have to be, because it doesn't violate
> >>>> ALOS guarantees as far as I can see), we could make
> >>>> ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing
> >>>> the WriteBatch on error (obviously, only if we have a chance to do
> >>>> that).
> >>>>
> >>>> Cheers,
> >>>> Lucas
> >>>>
> >>>> On Mon, Oct 16, 2023 at 12:19 PM Nick Telford 
> >>>> wrote:
> >>>>>
> >>>>> Hi Guozhang,
> >>>>>
> >>>>> The KIP as it stands introduces a new configuration,
> >>>>> default.state.isolation.level, which is independent of
> >> processing.mode.
> >>>>> It's intended that this new configuration be used to configure a
> >> global
> >>>> IQ
> >>>>> isolation level in the short term, with a future KIP introducing the
> >>>>> capability to change the isolation level on a per-query basis,
> >> falling
> >>>> back
> >>>>> to the "default" defined by this config. That's why I called it
> >>>> "default",
> >>>>> for future-proofing.
> >>>>>
> >>>>> However, it currently includes the caveat that READ_UNCOMMITTED is
> >> not
> >>>>> available under EOS. I think this is the coupling you are alluding
> >> to?
> >>>>>
> >>>>> This isn't intended to be a restriction of the API, but is currently
> >> a
> >>>>> technical limitation. However, after discussing with some users about
> >>>>> use-cases that would require READ_UNCOMMITTED under EOS, I'm
> >> inclined to
> >>>>> remove that clause and put in the necessary work to make that
> >> combination
> >>>>> possible now.
> >>>>>
> >>>>> I currently see two possible approaches:
> >>>>>
> >>>>> 1. Disable TX StateStores internally when the IsolationLevel is
> >>>>> READ_UNCOMMITTED and the processing.mode is EOS. This is more
> >>>> difficult
> >>>>> than it sounds, as there are many assumptions being made
> >> throughout
> >>>> the
> >>>>> internals about the guarantees StateStores provide. It would
> >>>> definitely add
> >>>>> a lot of extra "if (read_uncommitted && eos)" branches,
> >> complicating
> >>>>> maintenance and testing.
> >>>>> 2. Invest the time *now* to make READ_UNCOMMITTED of EOS
> >> StateStores
> >>>>> possible. I have some ideas on how this could be achieved, but
> >> they
> >>>> would
> >>>>> need testing and could introduce some additional issues. The
> >> benefit
> >>>> of
> >>>>> this approach is that it 

Re: [ANNOUNCE] New Kafka PMC Member: Satish Duggana

2023-10-28 Thread Guozhang Wang
Congratulations Satish!

On Sat, Oct 28, 2023 at 12:59 AM Luke Chen  wrote:
>
> Congrats Satish!
>
> Luke
>
> On Sat, Oct 28, 2023 at 11:16 AM ziming deng 
> wrote:
>
> > Congratulations Satish!
> >
> > > On Oct 27, 2023, at 23:03, Jun Rao  wrote:
> > >
> > > Hi, Everyone,
> > >
> > > Satish Duggana has been a Kafka committer since 2022. He has been very
> > > instrumental to the community since becoming a committer. It's my
> > pleasure
> > > to announce that Satish is now a member of Kafka PMC.
> > >
> > > Congratulations Satish!
> > >
> > > Jun
> > > on behalf of Apache Kafka PMC
> >
> >


Re: [VOTE] KIP-988 Streams StandbyUpdateListener

2023-10-17 Thread Guozhang Wang
+1 from me.

On Mon, Oct 16, 2023 at 1:56 AM Lucas Brutschy
 wrote:
>
> Hi,
>
> thanks again for the KIP!
>
> +1 (binding)
>
> Cheers,
> Lucas
>
>
>
> On Sun, Oct 15, 2023 at 9:13 AM Colt McNealy  wrote:
> >
> > Hello there,
> >
> > I'd like to call a vote on KIP-988 (co-authored by my friend and colleague
> > Eduwer Camacaro). We are hoping to get it in before the 3.7.0 release.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> >
> > Cheers,
> > Colt McNealy
> >
> > *Founder, LittleHorse.dev*


Re: [VOTE] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-17 Thread Guozhang Wang
Seems my previous msg was sent to the wrong recipient, just resending..

On Fri, Oct 13, 2023 at 7:06 PM Guozhang Wang
 wrote:
>
> Thanks Hanyu. I made a pass on the KIP and read through the DISCUSS
> thread. Do not have any comments. +1
>
> On Fri, Oct 13, 2023 at 9:29 AM Hanyu (Peter) Zheng
>  wrote:
> >
> > Hello everyone,
> >
> > I would like to start a vote for KIP-985 that Add reverseRange and
> > reverseAll query over kv-store in IQv2.
> >
> > Sincerely,
> > Hanyu
> >
> > On Fri, Oct 13, 2023 at 9:15 AM Hanyu (Peter) Zheng 
> > wrote:
> >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-985:+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2
> > >
> > > --
> > >
> > > [image: Confluent] <https://www.confluent.io>
> > > Hanyu (Peter) Zheng he/him/his
> > > Software Engineer Intern
> > > +1 (213) 431-7193 <+1+(213)+431-7193>
> > > Follow us: [image: Blog]
> > > <https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>[image:
> > > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> > > <https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
> > > <https://slackpass.io/confluentcommunity>[image: YouTube]
> > > <https://youtube.com/confluent>
> > >
> > > [image: Try Confluent Cloud for Free]
> > > <https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic>
> > >
> >
> >
> > --
> >
> > [image: Confluent] <https://www.confluent.io>
> > Hanyu (Peter) Zheng he/him/his
> > Software Engineer Intern
> > +1 (213) 431-7193 <+1+(213)+431-7193>
> > Follow us: [image: Blog]
> > <https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>[image:
> > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> > <https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
> > <https://slackpass.io/confluentcommunity>[image: YouTube]
> > <https://youtube.com/confluent>
> >
> > [image: Try Confluent Cloud for Free]
> > <https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic>


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-14 Thread Guozhang Wang
Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:
>
> Hello there!
>
> Thanks everyone for the comments. There's a lot of back-and-forth going on,
> so I'll do my best to summarize what everyone's said in TLDR format:
>
> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do similarly
> for the other methods.
> 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> 3. Remove the `earliestOffset` parameter for performance reasons.
>
> If that's all fine with everyone, I'll update the KIP and we—well, mostly
> Edu (:  —will open a PR.
>
> Cheers,
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
> wrote:
>
> > Hello everyone,
> >
> > Thanks for all your feedback for this KIP!
> >
> > I think that the key to choosing proper names for this API is understanding
> > the terms used inside the StoreChangelogReader. Currently, this class has
> > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
> > using StandbyUpdateListener for the interface fits better on these terms.
> > Same applies for onUpdateStart/Suspended.
> >
> > StoreChangelogReader uses "the same mechanism" for active task restoration
> > and standby task updates, but this is an implementation detail. Under
> > normal circumstances (no rebalances or task migrations), the changelog
> > reader will be in STANDBY_UPDATING, which means it will be updating standby
> > tasks as long as there are new records in the changelog topic. That's why I
> > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
> > align with StateRestoreListener, but either one is fine.
> >
> > Edu
> >
> > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang 
> > wrote:
> >
> > > Hello Colt,
> > >
> > > Thanks for writing the KIP! I have read through the updated KIP and
> > > overall it looks great. I only have minor naming comments (well,
> > > aren't naming the least boring stuff to discuss and that takes the
> > > most of the time for KIPs :P):
> > >
> > > 1. I tend to agree with Sophie regarding whether or not to include
> > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > it is also more consistent with the functions of
> > > "StateRestoreListener" where we do not name it as
> > > "onStateRestoreState" etc.
> > >
> > > 2. I know in community discussions we sometimes say "a standby is
> > > promoted to active", but in the official code / java docs we did not
> > > have a term of "promotion", since what the code does is really recycle
> > > the task (while keeping its state stores open), and create a new
> > > active task that takes in the recycled state stores and just changing
> > > the other fields like task type etc. After thinking about this for a
> > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > facing purposes while "recycle" is more of a technical detail inside
> > > the code and could be abstracted away from users. So I feel keeping
> > > the name "PROMOTED" is fine.
> > >
> > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > avoid another call to the Kafka API. And on the other hand, I also
> > > tend to think that such bookkeeping may be better done at the app
> > > level than from the Streams' public API level. I.e. the app could keep
> > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > when we have rolling restart and hence some standby task keeps
> > > "jumping" from one client to another via task assignment, the app
> > > would update this value just one when it finds the
> > > ""topic-partition-store" was never triggered before. What do you
> > > think?
> > >
> > > 4. I do not have a strong opinion either, but what about
> > "onBatchUpdated" ?
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 
> > wrote:
> > > >
> > > > Sohpie—
> > > >
> > > > Thank you very much for such a detailed review of the KIP. It might
> > > > actually be longer than the original KIP in the first place!
> > > >
> > > > 1. Ack'ed and fixe

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Guozhang Wang
Hello Nick,

First of all, thanks a lot for the great effort you've put in driving
this KIP! I really like it coming through finally, as many people in
the community have raised this. At the same time I honestly feel a bit
ashamed for not putting enough of my time supporting it and pushing it
through the finish line (you raised this KIP almost a year ago).

I briefly passed through the DISCUSS thread so far, not sure I've 100
percent digested all the bullet points. But with the goal of trying to
help take it through the finish line in mind, I'd want to throw
thoughts on top of my head only on the point #4 above which I felt may
be the main hurdle for the current KIP to drive to a consensus now.

The general question I asked myself is, whether we want to couple "IQ
reading mode" with "processing mode". While technically I tend to
agree with you that, it's feels like a bug if some single user chose
"EOS" for processing mode while choosing "read uncommitted" for IQ
reading mode, at the same time, I'm thinking if it's possible that
there could be two different persons (or even two teams) that would be
using the stream API to build the app, and the IQ API to query the
running state of the app. I know this is less of a technical thing but
rather a more design stuff, but if it could be ever the case, I'm
wondering if the personale using the IQ API knows about the risks of
using read uncommitted but still chose so for the favor of
performance, no matter if the underlying stream processing mode
configured by another personale is EOS or not. In that regard, I'm
leaning towards a "leaving the door open, and close it later if we
found it's a bad idea" aspect with a configuration that we can
potentially deprecate than "shut the door, clean for everyone". More
specifically, allowing the processing mode / IQ read mode to be
decoupled, and if we found that there's no such cases as I speculated
above or people started complaining a lot, we can still enforce
coupling them.

Again, just my 2c here. Thanks again for the great patience and
diligence on this KIP.


Guozhang



On Fri, Oct 13, 2023 at 8:48 AM Nick Telford  wrote:
>
> Hi Bruno,
>
> 4.
> I'll hold off on making that change until we have a consensus as to what
> configuration to use to control all of this, as it'll be affected by the
> decision on EOS isolation levels.
>
> 5.
> Done. I've chosen "committedOffsets".
>
> Regards,
> Nick
>
> On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna  wrote:
>
> > Hi Nick,
> >
> > 1.
> > Yeah, you are probably right that it does not make too much sense.
> > Thanks for the clarification!
> >
> >
> > 4.
> > Yes, sorry for the back and forth, but I think for the sake of the KIP
> > it is better to let the ALOS behavior as it is for now due to the
> > possible issues you would run into. Maybe we can find a solution in the
> > future. Now the question returns to whether we really need
> > default.state.isolation.level. Maybe the config could be the feature
> > flag Sophie requested.
> >
> >
> > 5.
> > There is a guideline in Kafka not to use the get prefix for getters (at
> > least in the public API). Thus, could you please rename
> >
> > getCommittedOffset(TopicPartition partition) ->
> > committedOffsetFor(TopicPartition partition)
> >
> > You can also propose an alternative to committedOffsetFor().
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 10/13/23 3:21 PM, Nick Telford wrote:
> > > Hi Bruno,
> > >
> > > Thanks for getting back to me.
> > >
> > > 1.
> > > I think this should be possible. Are you thinking of the situation where
> > a
> > > user may downgrade to a previous version of Kafka Streams? In that case,
> > > sadly, the RocksDBStore would get wiped by the older version of Kafka
> > > Streams anyway, because that version wouldn't understand the extra column
> > > family (that holds offsets), so the missing Position file would
> > > automatically get rebuilt when the store is rebuilt from the changelog.
> > > Are there other situations than downgrade where a transactional store
> > could
> > > be replaced by a non-transactional one? I can't think of any.
> > >
> > > 2.
> > > Ahh yes, the Test Plan - my Kryptonite! This section definitely needs to
> > be
> > > fleshed out. I'll work on that. How much detail do you need?
> > >
> > > 3.
> > > See my previous email discussing this.
> > >
> > > 4.
> > > Hmm, this is an interesting point. Are you suggesting that under ALOS
> > > READ_COMMITTED should not be supported?
> > >
> > > Regards,
> > > Nick
> > >
> > > On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna  wrote:
> > >
> > >> Hi Nick,
> > >>
> > >> I think the KIP is converging!
> > >>
> > >>
> > >> 1.
> > >> I am wondering whether it makes sense to write the position file during
> > >> close as we do for the checkpoint file, so that in case the state store
> > >> is replaced with a non-transactional state store the non-transactional
> > >> state store finds the position file. I think, this is not strictly
> > >> needed, but would be a 

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-13 Thread Guozhang Wang
Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and
overall it looks great. I only have minor naming comments (well,
aren't naming the least boring stuff to discuss and that takes the
most of the time for KIPs :P):

1. I tend to agree with Sophie regarding whether or not to include
"Standby" in the functions of "onStandbyUpdateStart/Suspended", since
it is also more consistent with the functions of
"StateRestoreListener" where we do not name it as
"onStateRestoreState" etc.

2. I know in community discussions we sometimes say "a standby is
promoted to active", but in the official code / java docs we did not
have a term of "promotion", since what the code does is really recycle
the task (while keeping its state stores open), and create a new
active task that takes in the recycled state stores and just changing
the other fields like task type etc. After thinking about this for a
bit, I tend to feel that "promoted" is indeed a better name for user
facing purposes while "recycle" is more of a technical detail inside
the code and could be abstracted away from users. So I feel keeping
the name "PROMOTED" is fine.

3. Regarding "earliestOffset", it does feel like we cannot always
avoid another call to the Kafka API. And on the other hand, I also
tend to think that such bookkeeping may be better done at the app
level than from the Streams' public API level. I.e. the app could keep
a "first ever starting offset" per "topic-partition-store" key, and a
when we have rolling restart and hence some standby task keeps
"jumping" from one client to another via task assignment, the app
would update this value just one when it finds the
""topic-partition-store" was never triggered before. What do you
think?

4. I do not have a strong opinion either, but what about "onBatchUpdated" ?


Guozhang

On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy  wrote:
>
> Sohpie—
>
> Thank you very much for such a detailed review of the KIP. It might
> actually be longer than the original KIP in the first place!
>
> 1. Ack'ed and fixed.
>
> 2. Correct, this is a confusing passage and requires context:
>
> One thing on our list of TODO's regarding reliability is to determine how
> to configure `session.timeout.ms`. In our Kubernetes Environment, an
> instance of our Streams App can be terminated, restarted, and get back into
> the "RUNNING" Streams state in about 20 seconds. We have two options here:
> a) set session.timeout.ms to 30 seconds or so, and deal with 20 seconds of
> unavailability for affected partitions, but avoid shuffling Tasks; or b)
> set session.timeout.ms to a low value, such as 6 seconds (
> heartbeat.interval.ms of 2000), and reduce the unavailability window during
> a rolling bounce but incur an "extra" rebalance. There are several
> different costs to a rebalance, including the shuffling of standby tasks.
> JMX metrics are not fine-grained enough to give us an accurate picture of
> what's going on with the whole Standby Task Shuffle Dance. I hypothesize
> that the Standby Update Listener might help us clarify just how the
> shuffling actually (not theoretically) works, which will help us make a
> more informed decision about the session timeout config.
>
> If you think this is worth putting in the KIP, I'll polish it and do so;
> else, I'll remove the current half-baked explanation.
>
> 3. Overall, I agree with this. In our app, each Task has only one Store to
> reduce the number of changelog partitions, so I sometimes forget the
> distinction between the two concepts, as reflected in the KIP (:
>
> 3a. I don't like the word "Restore" here, since Restoration refers to an
> Active Task getting caught up in preparation to resume processing.
> `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a
> native Python speaker so I do prefer shorter names anyways (:
>
> 3b1. +1 to removing the word 'Task'.
>
> 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> `onStandbyUpdateStart()` which matches the name of the Interface
> "StandbyUpdateListener". (the python part of me hates this, however)
>
> 3b3. Going back to question 2), `earliestOffset` was intended to allow us
> to more easily calculate the amount of state _already loaded_ in the store
> by subtracting (startingOffset - earliestOffset). This would help us see
> how much inefficiency is introduced in a rolling restart—if we end up going
> from a situation with an up-to-date standby before the restart, and then
> after the whole restart, the Task is shuffled onto an instance where there
> is no previous state, then that is expensive. However, if the final
> shuffling results in the Task back on an instance with a lot of pre-built
> state, it's not expensive.
>
> If a call over the network is required to determine the earliestOffset,
> then this is a "hard no-go" for me, and we will remove it (I'll have to
> check with Eduwer as he is close to having a working implementation). I
> think we can probably determine 

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-13 Thread Guozhang Wang
Thanks Alieh for the KIP, as well as a nice summary of all the
discussions! Just my 2c regarding your open questions:

1. I just checked KIP-889
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores)
and we used "VersionedRecord get(K key, long asOfTimestamp);", so I
feel to be consistent with this API is better compared with being
consistent with "WindowKeyQuery"?

3. I agree with Matthias that naming is always tricky, and I also tend
to be consistent with what we already have (even if in retro it may
not be the best idea :P and if that was really becoming a complaint,
we would change it across the board in one shot as well later).

Guozhang

On Wed, Oct 11, 2023 at 9:12 PM Matthias J. Sax  wrote:
>
> Thanks for the update!
>
>
>
> Some thoughts about changes you made and open questions you raised:
>
>
> 10) About asOf vs until: I was just looking into `WindowKeyQuery`,
> `WindowRangeQuery` and also `ReadOnlyWindowStore` interfaces. For those,
> we use "timeFrom" and "timeTo", so it seems best to actually use
> `to(Instant toTime)` to keep the naming consistent across the board?
>
> If yes, we should also do `from (Instant fromTime)` and use getters
> `fromTime()` and `toTime()` -- given that it's range bounds it seems
> acceptable to me, to diverge a little bit from KIP-960 `asOfTimestamp()`
> -- but we could also rename it to `asOfTime()`? -- Given that we
> strongly type with `Instant` I am not worried about semantic ambiguity.
>
>
>
> 20) About throwing a NPE when time bounds are `null` -- why? (For the
> key it makes sense as is mandatory to have a key.) Could we not
> interpret `null` as "no bound". We did KIP-941 to add `null` for
> open-ended `RangeQueries`, so I am wondering if we should just stick to
> the same semantics?
>
> Cf
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds
>
>
>
> 30) About the class naming. That's always tricky, and I am not married
> to my proposal. I agree with Bruno that the other suggested names are
> not really better. -- The underlying idea was, to get some "consistent"
> naming across the board.
>
> Existing `KeyQuery`
> New `VersionedKeyQuery` (KIP-960; we add a prefix)
> New `MultiVersionKeyQuery` (this KIP; extend the prefix with a pre-prefix)
>
> Existing `RangeQuery`
> New `MultiVersionRangeQuery` (KIP-969; add same prefix as above)
>
>
>
> 40) I am fine with not adding `range(from, to)` -- it was just an idea.
>
>
>
>
>
> Some more follow up question:
>
> 50) You propose to add a new constructor and getter to `VersionedRecord`
> -- I am wondering if this implies that `validTo` is optional because the
> existing constructor is not deprecated? -- Also, what happens if
> `validTo` is not set and `valueTo()` is called? Or do we intent to make
> `validTo` mandatory?
>
> Maybe this question can only be answered when working on the code, but I
> am wondering if we should make `validTo` mandatory or not... And what
> the "blast radius" of changing `VersionedRecord` will be in general. Do
> you have already some POC PR that we could look at to get some signals
> about this?
>
>
>
> 60) The new query class is defined to return
> `ValueIterator>` -- while I like the idea to add
> `ValueIterator` in a generic way on the one hand, I am wondering if
> it might be better to change it, and enforce its usage (ie, return type)
> of `VersionedRecord` to improve type safety (type erasure is often a
> pain, and we could mitigate it this way).
>
> Btw: We actually do a similar thing for `KeyValueIterator`.
>
> Ie,
>
> public interface ValueIterator extends Iterator>
>
> and
>
> ValueAndTimestamp peek();
>
> This would imply that the return type of the new query is
> `ValueIterator` on the interface what seems simpler and more elegant?
>
> If we go with the change, I am also wondering if we need to find a
> better name for the new iterator class? Maybe `VersionIterator` or
> something like this?
>
> Of course it might limit the use of `ValueIterator` for other value
> types -- not sure if this a limitation that is prohibitive? My gut
> feeling is, that is should not be too limiting.
>
>
>
>
> 70) Do we really need the change in `VersionedKeyValueStore` and add a
> new method? In the end, the idea of IQv2 is to avoid exactly this... It
> was the main issue for IQv1, that the base interface of the store needed
> an update and thus all classed implementing the base interface, making
> it very cumbersome to add new query types. -- Of course, we need this
> new method on the actually implementation (as private method) that can
> be called from `query()` method, but adding it to the interface seems to
> defeat the purpose of IQv2.
>
> Note, for existing IQv2 queries types that go against others stores, the
> public methods already existed when IQv2 was introduces, and thus the
> implementation of these query types just pragmatically re-used existing
> methods -- but it does not imply that new 

Re: [ANNOUNCE] New Kafka PMC Member: Justine Olshan

2023-09-22 Thread Guozhang Wang
Congratulations!

On Fri, Sep 22, 2023 at 8:44 PM Tzu-Li (Gordon) Tai  wrote:
>
> Congratulations Justine!
>
> On Fri, Sep 22, 2023, 19:25 Philip Nee  wrote:
>
> > Congrats Justine!
> >
> > On Fri, Sep 22, 2023 at 7:07 PM Luke Chen  wrote:
> >
> > > Hi, Everyone,
> > >
> > > Justine Olshan has been a Kafka committer since Dec. 2022. She has been
> > > very active and instrumental to the community since becoming a committer.
> > > It's my pleasure to announce that Justine is now a member of Kafka PMC.
> > >
> > > Congratulations Justine!
> > >
> > > Luke
> > > on behalf of Apache Kafka PMC
> > >
> >


Re: [ANNOUNCE] New committer: Lucas Brutschy

2023-09-21 Thread Guozhang Wang
Congratulations, Lucas!

On Thu, Sep 21, 2023 at 8:45 AM Bruno Cadonna  wrote:
>
> Hi all,
>
> The PMC of Apache Kafka is pleased to announce a new Kafka committer
> Lucas Brutschy.
>
> Lucas' major contributions are around Kafka Streams.
>
> Lucas' significantly contributed to the state updater
> (https://issues.apache.org/jira/browse/KAFKA-10199) and he drives the
> implementation of the new threading model for Kafka Streams
> (https://issues.apache.org/jira/browse/KAFKA-15326).
>
> Lucas' contributions to KIP discussions and PR reviews are very thoughtful.
>
> Congratulations, Lucas!
>
> Thanks,
>
> Bruno (on behalf of the Apache Kafka PMC)


Re: [ANNOUNCE] New committer: Yash Mayya

2023-09-21 Thread Guozhang Wang
Congrats, Yash!

On Thu, Sep 21, 2023 at 8:28 AM Bruno Cadonna  wrote:
>
> Hi all,
>
> The PMC of Apache Kafka is pleased to announce a new Kafka committer
> Yash Mayya.
>
> Yash's major contributions are around Connect.
>
> Yash authored the following KIPs:
>
> KIP-793: Allow sink connectors to be used with topic-mutating SMTs
> KIP-882: Kafka Connect REST API configuration validation timeout
> improvements
> KIP-970: Deprecate and remove Connect's redundant task configurations
> endpoint
> KIP-980: Allow creating connectors in a stopped state
>
> Overall, Yash is known for insightful and friendly input to discussions
> and his high quality contributions.
>
> Congratulations, Yash!
>
> Thanks,
>
> Bruno (on behalf of the Apache Kafka PMC)


Re: [DISCUSS] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-06 Thread Guozhang Wang
I'm just thinking we can try to encourage users to migrate from XX to
XXWithKey in the docs, giving this as one good example that the latter
can help you distinguish different scenarios whereas the former
cannot.

On Fri, Aug 4, 2023 at 6:32 PM Matthias J. Sax  wrote:
>
> Guozhang,
>
> thanks for pointing out ValueJoinerWithKey. In the end, it's just a
> documentation change, ie, point out that the passed in key could be
> `null` and similar?
>
> -Matthias
>
>
> On 8/2/23 3:20 PM, Guozhang Wang wrote:
> > Thanks Florin for the writeup,
> >
> > One quick thing I'd like to bring up is that in KIP-149
> > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner)
> > we introduced ValueJoinerWithKey which is aimed to enhance
> > ValueJoiner. It would have a benefit for this KIP such that
> > implementers can distinguish "null-key" v.s. "not-null-key but
> > null-value" scenarios.
> >
> > Hence I'd suggest we also include the semantic changes with
> > ValueJoinerWithKey, which can help distinguish these two scenarios,
> > and also document that if users apply ValueJoiner only, they may not
> > have this benefit, and hence we suggest users to use the former.
> >
> >
> > Guozhang
> >
> > On Mon, Jul 31, 2023 at 12:11 PM Florin Akermann
> >  wrote:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams


Re: [DISCUSS] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-02 Thread Guozhang Wang
Thanks Florin for the writeup,

One quick thing I'd like to bring up is that in KIP-149
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner)
we introduced ValueJoinerWithKey which is aimed to enhance
ValueJoiner. It would have a benefit for this KIP such that
implementers can distinguish "null-key" v.s. "not-null-key but
null-value" scenarios.

Hence I'd suggest we also include the semantic changes with
ValueJoinerWithKey, which can help distinguish these two scenarios,
and also document that if users apply ValueJoiner only, they may not
have this benefit, and hence we suggest users to use the former.


Guozhang

On Mon, Jul 31, 2023 at 12:11 PM Florin Akermann
 wrote:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams


Re: [VOTE] KIP-954: expand default DSL store configuration to custom types

2023-07-29 Thread Guozhang Wang
Thanks Almog! I made a pass over the updated wiki and have no more questions. +1

Guozhang

On Wed, Jul 26, 2023 at 8:14 AM Almog Gavra  wrote:
>
> Hello Everyone,
>
> Opening the voting for KIP-954. The discussion is converging, but please
> feel free to chime in on the last few conversation points if you aren't
> happy with where it settled.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types
>
> Cheers,
> Almog


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-27 Thread Guozhang Wang
Yes, that sounds right to me. Thanks Sophie.

On Thu, Jul 27, 2023 at 12:35 PM Sophie Blee-Goldman
 wrote:
>
> A2: Guozhang, just to close the book on the ListValue store thing, I fully
> agree it seems like overreach
> to expose/force this on users, especially if it's fully internal today. But
> just to make sure we're on the same
> page here, you're still ok with this KIP fixing the API gap that exists
> today, in which these stores cannot be
> customized by the user at all?
>
> In other words, after this KIP, the new behavior for the ListValue store in
> a stream join will be:
>
> S1: First, check if the user passed in a `DSLStoreSuppliers` (or whatever
> the name will be) to the
>StreamJoined config object, and use that to obtain the
> KVStoreSupplier for this ListValue store
>
> S2: If none was provided, check if the user has set a default
> DSLStoreSuppliers via the new config,
>and use that to get the KVStoreSupplier if so
>
> S3: If neither is set, fall back to the original logic as it is today,
> which is to pass in a KVStoreSupplier
>that is hard-coded to be either RocksDB or InMemory, based on what
> is returned for the #persistent
>API by the StreamJoined's WindowStoreSupplier
>
> Does that sound right? We can clarify this further in the KIP if need be
>
> On Thu, Jul 27, 2023 at 10:48 AM Guozhang Wang 
> wrote:
>
> > Hi all,
> >
> > Like Almog's secretary as well! Just following up on that index:
> >
> > A2: I'm also happy without introducing versioned KV in this KIP as I
> > would envision it to be introduced as new params into the
> > KeyValuePluginParams in the future. And just to clarify on Sophie's
> > previous comment, I think ListStore should not be exposed in this API
> > until we see it as a common usage and hence would want to (again, we
> > need to think very carefully since it would potentially ask all
> > implementers to adopt) move it from the internal category to the
> > public interface category. As for now, I think only having kv / window
> > / session as public store types is fine.
> >
> > A3: Seems I was not making myself very clear at the beginning :P The
> > major thing that I'd actually like to avoid having two configs
> > co-exist for the same function since it will be a confusing learning
> > curve for users, and hence what I was proposing is to just have the
> > newly introduced interface but not introducing a new config, and I
> > realized now that it is actually more aligned with the CUSTOM idea
> > where the ordering would be looking at config first, and then the
> > interface. I blushed as I read Almog likes it.. After thinking about
> > it twice, I'm now a bit leaning towards just deprecating the old
> > config with the new API+config as well.
> >
> > A5: Among the names we have been discussed so far:
> >
> > DslStorePlugin
> > StoreTypeSpec
> > StoreImplSpec
> > DslStoreSuppliers
> >
> > I am in favor of DslStoreSuppliers as well as a restrictiveness on its
> > scope, just to echo Bruno's comments above.
> >
> >
> >
> > Guozhang
> >
> > On Thu, Jul 27, 2023 at 4:15 AM Bruno Cadonna  wrote:
> > >
> > > Hi,
> > >
> > > A5. I have to admit that
> > > "If we envision extending this beyond just StoreSupplier types, it could
> > > be a good option."
> > > is scaring me a bit.
> > > I am wondering what would be an example for such an extension?
> > > In general, I would propose to limit the scope of a config. In this case
> > > the config should provide suppliers for state stores for the DSL.
> > >
> > > BTW, maybe it is a good idea to let DslStorePlugin extend Configurable.
> > >
> > > Best,
> > > Bruno
> > >
> > > On 7/27/23 2:15 AM, Sophie Blee-Goldman wrote:
> > > > Thanks for the feedback Bruno -- sounds like we're getting close to a
> > final
> > > > consensus here.
> > > > It sounds like the two main (only?) semi-unresolved questions that
> > still
> > > > have differing
> > > > opinions floating around are whether to deprecate the old config, and
> > what
> > > > to name the new config
> > > > + interface.
> > > >
> > > > Although I won't personally push back on any of the options listed
> > above,
> > > > here's my final two cents:
> > > >
> > > > A3. I'm still a firm believer in deprecating the old config, and agree
> > > > wholeheartedly with what Bruno 

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-27 Thread Guozhang Wang
ving in that
> >>> direction.
> >>>>>>> Given how much discussion there was on this KIP, which is minor
> >>>>>> relative to
> >>>>>>> making the changes to StoreBuilder API, I'd rather not tie the two
> >>>>>>> together.
> >>>>>>>
> >>>>>>> Cheers & Thanks everyone for the thoughts!
> >>>>>>> - Almog
> >>>>>>>
> >>>>>>> On Sun, Jul 23, 2023 at 5:15 PM Sophie Blee-Goldman <
> >>>>>>> ableegold...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Guozhang:
> >>>>>>>>
> >>>>>>>> On your 2nd point:
> >>>>>>>>
> >>>>>>>>> "impl types" (in hindsight it may not be a good name) for rocksdb
> >> /
> >>>>>>>> memory / custom, and we used "store types" for kv / windowed /
> >>>>>> sessioned
> >>>>>>>> etc,
> >>>>>>>> First off, thanks so much for this clarification -- using "store
> >>> type"
> >>>>>>> here
> >>>>>>>> was definitely making me uncomfortable as this usually refers to KV
> >>> vs
> >>>>>>>> window, etc -- but I just couldn't for the life of me think of the
> >>>>>> right
> >>>>>>>> term for rocks vs IM. We should 100% change to something like
> >>>>>>> StoreImplSpec
> >>>>>>>> for this kind of interface.
> >>>>>>>>
> >>>>>>>>> As for list-value store (for stream-stream Join)
> >>>>>>>> Again, glad you mentioned this -- I forgot how the extra
> >>> stream-stream
> >>>>>>> join
> >>>>>>>> store is not a "regular" KV Store but rather this special
> >> list-value
> >>>>>>> store.
> >>>>>>>> If we proceed with something like the current approach, perhaps
> >> that
> >>>>>>> should
> >>>>>>>> be a boolean (or enum) parameter in the KVConfig, similar to the
> >>>>>>>> EmitStrategy? After all, the high-level goal of this KIP is to be
> >>>>>> able to
> >>>>>>>> fully customize all DSL state stores, and this is currently not
> >>>>>> possible
> >>>>>>>> due to KAFKA-14976 <
> >>> https://issues.apache.org/jira/browse/KAFKA-14976
> >>>>>>> .
> >>>>>>>>
> >>>>>>>> If we expect there to be further customizations like this going
> >>>>>> forward,
> >>>>>>>> perhaps we could instead have each of the three StoreConfig classes
> >>>>>>> accept
> >>>>>>>> a single enum parameter for the "sub-type" (or whatever you want to
> >>>>>> call
> >>>>>>>> it), which would encompass (and replace) things like the
> >> EmitStrategy
> >>>>>> as
> >>>>>>>> well as the list-value type (we could define one enum for each
> >> Config
> >>>>>>> class
> >>>>>>>> so there is no accidentally requesting a LIST_VALUE subtype on a
> >>>>>>>> WindowStore). Thoughts?
> >>>>>>>>
> >>>>>>>> Lastly, regarding 3.b:
> >>>>>>>>
> >>>>>>>> I love that you brought this up because that is actually what I
> >> first
> >>>>>>>> proposed to Almog, ie introducing a param class to clean up the
> >>>>>>>> StoreBuilder API, during our chat that led to this KIP. He pushed
> >>>>>> back,
> >>>>>>>> claiming (rightly so) that this change would be huge in scope for a
> >>>>>>> purely
> >>>>>>>> aesthetic/API change that doesn't add any functionality, and that
> >> it
> >>>>>>> makes
> >>>>>>>> more sense to start with the DSL config since there is a clear gap
> >> in
> >>>

Re: plz help me code review

2023-07-27 Thread Guozhang Wang
Hello Xiangyuan,

Thanks for your contributions and thanks for raising it to community's
attention. I will take a look at it sooner than later.

On Thu, Jul 27, 2023 at 2:41 AM Xiangyuan LI  wrote:
>
> Hi kafka team:
>   I raise a pr https://github.com/apache/kafka/pull/13965 to fix
> https://issues.apache.org/jira/browse/KAFKA-15106,
>   it mentions some serious bug and no one check it for a long time, plz
> help me review if could. thx!
>
>  these bugs are
> in 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.ConstrainedAssignmentBuilder#ConstrainedAssignmentBuilder
> and all of them could cause rebalance stuck for ever, and they could occur
> in production environment, very, very, very easy.


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Guozhang Wang
Thanks everyone for the great discussions so far! I first saw the JIRA
and left some quick thoughts without being aware of the
already-written KIP (kudos to Almog, very great one) and the DISCUSS
thread here. And I happily find some of my initial thoughts align with
the KIP already :)

Would like to add a bit more of my 2c after reading through the KIP
and the thread here:

1. On the high level, I'm in favor of pushing this KIP through without
waiting on the other gaps to be closed. In my back pocket's
"dependency graph" of Kafka Streams roadmap of large changes or
feature gaps, the edges of dependencies are defined based on my
understanding of whether doing one first would largely complicate /
negate the effort of the other but not vice versa, in which case we
should consider getting the other done first. In this case, I feel
such a dependency is not strong enough, so encouraging the KIP
contributor to finish what he/she would love to do to close some gaps
early would be higher priorities. I did not see by just doing this we
could end up in a worse intermediate stage yet, but I could be
corrected.

2. Regarding the store types --- gain here I'd like to just clarify
the terms a bit since in the past it has some confusions: we used
"impl types" (in hindsight it may not be a good name) for rocksdb /
memory / custom, and we used "store types" for kv / windowed /
sessioned etc, as I said in the JIRA I think the current proposal also
have a good side effect as quality bar to really enforce us think
twice when trying to add more store types in the future as it will
impact API instantiations. In the ideal world, I would consider:

* We have (timestamped) kv store, versioned kv store, window store,
session store as first-class DSL store types. Some DSL operators could
accept multiple store types (e.g. versioned and non versioned
kv-store) for semantics / efficiency trade-offs. But I think we would
remove un-timestamped kv stores eventually since that efficiency
trade-off is so minimal compared to its usage limitations.
* As for list-value store (for stream-stream Join), memory-lru-cache
(for PAPI use only), memory-time-ordered-buffer (for suppression),
they would not be exposed as DSL first-class store types in the
future. Instead, they would be treated as internal used stores (e.g.
list-value store is built on key-value store with specialized serde
and putInternal), or continue to be just convenient OOTB PAPI used
stores only.
* As we move on, we will continue to be very, very strict on what
would be added as DSL store types (and hence requires changes to the
proposed APIs), what to be added as convenient OOTB PAPI store impls
only, what to be added as internal used store types that should not be
exposed to users nor customizable at all.

3. Some more detailed thoughts below:

3.a) I originally also think that we can extend the existing config,
rather than replacing it. The difference was that I was thinking that
order-wise, the runtime would look at the API first, and then the
config, whereas in your rejected alternative it was looking at the
config first, and then the API --- that I think is a minor thing and
either is fine. I'm in agreement that having two configs would be more
confusing to users to learn about their precedence rather than
helpful, but if we keep both a config and a public API, then the
precedence ordering would not be so confusing as long as we state them
clearly. For example:

* We have DefaultStoreTypeSpec OOTB, in that impl we look at the
config only, and would only expect either ROCKS or MEMORY, and return
corresponding OOTB store impls; if any other values configured, we
error out.
* Users extend that by having MyStoreTypeSpec, in which they could do
arbituray things without respecting the config at all, but our
recommended pattern in docs would still say "look into the config, if
it is ROCKS or MEMORY just return fall back to DefaultStoreTypeSepc;
otherwise if it's some String you recognize, then return your
customized store based on the string value, otherwise error out".

3.b) About the struct-like Params classes, I like the idea a lot and
wished we would pursue this in the first place, but if we only do this
in Spec it would leave some inconsistencies with the StoreBuilders
though arguably the latter is only for PAPI. I'm wondering if we
should consider including the changes in StoreBuilders (e.g.
WindowStoreBuilder(WindowSupplierParams)), and if yes, maybe we should
also consider renaming that e.g. `WindowSupplierParams` to
`WindowStoreSpecParams` too? For this one I only have a "weak feeling"
so I can be convinced otherwise :P

Thanks,
Guozhang



On Sun, Jul 23, 2023 at 9:52 AM Matthias J. Sax  wrote:
>
> Thanks for all the input. My intention was not to block the KIP, but
> just to take a step back and try get a holistic picture and discussion,
> to explore if there are good/viable alternative designs. As said
> originally, I really like to close this gap, and was always aware 

Re: Request permission to contribute

2023-07-23 Thread Guozhang Wang
Hello Taras,

I saw your ID in the contributors list already, could you check if you
can create JIRAs now?

Guozhang

On Fri, Jul 21, 2023 at 4:00 AM Taras Ledkov  wrote:
>
> Hi, Kafka Team.
>
> I'm following this wiki to request permission to contribute to Apache Kafka
> https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals#KafkaImprovementProposals-GettingStarted
>
> I'll propose custom SSL factory for Kafka Connect REST server 
> [org.apache.kafka.connect.runtime.rest.RestServer].
> Kafka connect REST server can be configured only with file based key stores 
> in current implementation.
>
> My wiki ID and jira ID are both: tledkov (tled...@apache.org)
> Can I get permission please?
>
> --
> With best regards,
> Taras Ledkov
>


Re: [ANNOUNCE] New committer: Greg Harris

2023-07-10 Thread Guozhang Wang
Congratulations!

On Mon, Jul 10, 2023 at 9:17 AM Randall Hauch  wrote:
>
> Congratulations, Greg.
>
> On Mon, Jul 10, 2023 at 11:13 AM Mickael Maison 
> wrote:
>
> > Congratulations Greg!
> >
> > On Mon, Jul 10, 2023 at 6:08 PM Bill Bejeck 
> > wrote:
> > >
> > > Congrats Greg!
> > >
> > > -Bill
> > >
> > > On Mon, Jul 10, 2023 at 11:53 AM Divij Vaidya 
> > > wrote:
> > >
> > > > Congratulations Greg! I am going through a new committer teething
> > process
> > > > right now and would be happy to get you up to speed. Looking forward to
> > > > working with you in your new role.
> > > >
> > > > --
> > > > Divij Vaidya
> > > >
> > > >
> > > >
> > > > On Mon, Jul 10, 2023 at 5:51 PM Josep Prat  > >
> > > > wrote:
> > > >
> > > > > Congrats Greg!
> > > > >
> > > > >
> > > > > ———
> > > > > Josep Prat
> > > > >
> > > > > Aiven Deutschland GmbH
> > > > >
> > > > > Alexanderufer 3-7, 10117 Berlin
> > > > >
> > > > > Amtsgericht Charlottenburg, HRB 209739 B
> > > > >
> > > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > > >
> > > > > m: +491715557497
> > > > >
> > > > > w: aiven.io
> > > > >
> > > > > e: josep.p...@aiven.io
> > > > >
> > > > > On Mon, Jul 10, 2023, 17:47 Matthias J. Sax 
> > wrote:
> > > > >
> > > > > > Congrats!
> > > > > >
> > > > > > On 7/10/23 8:45 AM, Chris Egerton wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > The PMC for Apache Kafka has invited Greg Harris to become a
> > > > committer,
> > > > > > and
> > > > > > > we are happy to announce that he has accepted!
> > > > > > >
> > > > > > > Greg has been contributing to Kafka since 2019. He has made over
> > 50
> > > > > > commits
> > > > > > > mostly around Kafka Connect and Mirror Maker 2. His most notable
> > > > > > > contributions include KIP-898: "Modernize Connect plugin
> > discovery"
> > > > > and a
> > > > > > > deep overhaul of the offset syncing logic in MM2 that addressed
> > > > several
> > > > > > > technically-difficult, long-standing, high-impact issues.
> > > > > > >
> > > > > > > He has also been an active participant in discussions and
> > reviews on
> > > > > the
> > > > > > > mailing lists and on GitHub.
> > > > > > >
> > > > > > > Thanks for all of your contributions, Greg. Congratulations!
> > > > > > >
> > > > > >
> > > > >
> > > >
> >


Re: [ANNOUNCE] New PMC chair: Mickael Maison

2023-04-24 Thread Guozhang Wang
Congratulations Mickael!


Guozhang

On Sat, Apr 22, 2023 at 9:30 PM Kowshik Prakasam  wrote:
>
> Thanks a lot Jun for your hard work and contributions over the years.
> Congrats Mickael on your new role, well deserved! Wishing both of you, and
> the rest of the community the very best!
>
>
> Cheers,
> Kowshik
>
> On Sat, Apr 22, 2023, 6:38 PM Satish Duggana 
> wrote:
>
> > Thanks a lot Jun for your contributions as PMC chair for all these years.
> >
> > Congratulations Mickael on your new role.
> >
> > On Sat, 22 Apr 2023 at 17:42, Manyanda Chitimbo
> >  wrote:
> > >
> > > Congratulations Mickael.
> > > And thanks Jun for the work over the years.
> > >
> > > On Fri, Apr 21, 2023 at 5:10 PM Jun Rao 
> > wrote:
> > >
> > > > Hi, everyone,
> > > >
> > > > After more than 10 years, I am stepping down as the PMC chair of Apache
> > > > Kafka. We now have a new chair Mickael Maison, who has been a PMC
> > member
> > > > since 2020. I plan to continue to contribute to Apache Kafka myself.
> > > >
> > > > Congratulations, Mickael!
> > > >
> > > > Jun
> > > >
> > >
> > >
> > > --
> > > Manyanda Chitimbo.
> >


Re: Fwd: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-04-11 Thread Guozhang Wang
ot; which is to be removed from the aggregate as part of
> >>>  applying an update. To fix this, aggregations should ignore 
> >>> out-of-order
> >>>  records when aggregating versioned tables.
> >>> - In order to implement this change, table aggregate processors 
> >>> need
> >>> a way to determine whether a record is out-of-order or not. This
> >>> cannot be
> >>> done by querying the source table value getter as that store 
> >>> belongs to a
> >>> different subtopology (because a repartition occurs before
> >>> aggregation). As
> >>> such, an additional timestamp must be included in the repartition 
> >>> topic.
> >>> The 3.5 release already includes an update to the repartition
> >>> topic format
> >>> (with upgrade implications properly handled) via KIP-904
> >>> 
> >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed>,
> >>> so making an additional change to the repartition topic format to 
> >>> add a
> >>> timestamp comes at no additional cost to users.
> >>>
> >>>
> >>> I have updated the KIP
> >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores>
> >>> itself with more detail about each of these changes. Please let me know if
> >>> there are any concerns. In the absence of dissent, I'd like to include
> >>> these changes along with the rest of KIP-914 in the 3.5 release.
> >>>
> >>> Apologies for not noticing these additional semantics implications 
> >>> earlier,
> >>> Victoria
> >>>
> >>> -- Forwarded message -
> >>> From: Victoria Xia 
> >>> Date: Wed, Mar 22, 2023 at 10:08 AM
> >>> Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
> >>> To: 
> >>>
> >>>
> >>> Thanks for voting, everyone! We have three binding yes votes with no
> >>> objections during four full days of voting. I will close the vote and mark
> >>> the KIP as accepted, right in time for the 3.5 release.
> >>>
> >>> Thanks,
> >>> Victoria
> >>>
> >>> On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna  wrote:
> >>>
> >>>> +1 (binding)
> >>>>
> >>>> Thanks Victoria!
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 20.03.23 17:13, Matthias J. Sax wrote:
> >>>>> +1 (binding)
> >>>>>
> >>>>> On 3/20/23 9:05 AM, Guozhang Wang wrote:
> >>>>>> +1, thank you Victoria!
> >>>>>>
> >>>>>> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
> >>>>>>  wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I'd like to start a vote on KIP-914 for updating the Kafka Streams 
> >>>>>>> join
> >>>>>>> processors to use proper timestamp-based semantics in applications 
> >>>>>>> with
> >>>>>>> versioned stores:
> >>>>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >>>>>>>
> >>>>>>> To avoid compatibility concerns, I'd like to include the changes from
> >>>>>>> this
> >>>>>>> KIP together with KIP-889
> >>>>>>> <
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>>>
> >>>>>>> (for introducing versioned stores) in the upcoming 3.5 release. I will
> >>>>>>> close the vote on the 3.5 KIP deadline, March 22, if there are no
> >>>>>>> objections before then.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Victoria
> >>>>
> >>>


Re: Failing tests in Kafka stream is under investigation now

2023-04-06 Thread Guozhang Wang
Hi Luke,

Thanks for investigating it, I think my PR broke some tests somehow
--- I'm still looking through it why the local runs did not capture
them.. my apologies.

Guozhang

On Thu, Apr 6, 2023 at 4:57 AM Chia-Ping Tsai  wrote:
>
> hi Luke
>
> Thanks for your time and effort. It would be nice to see the green CI again :)
>
> —
> Chia-Ping
>
> > Luke Chen  於 2023年4月6日 下午6:13 寫道:
> >
> > Hi all,
> >
> > Just want to let you know, our current CI test results will contain
> > many(more than 600 in build 1738
> > ) failed
> > tests. I'm already investigating them now, and have identified some errors
> > and a PR  is raised running the
> > CI tests now. Hopefully, it can bring the healthy CI tests back soon.
> >
> > Stay tuned.
> >
> > Luke


[jira] [Created] (KAFKA-14847) Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS

2023-03-24 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14847:
-

 Summary: Separate the callers of commitAllTasks v.s. commitTasks 
for EOS(-v2) and ALOS
 Key: KAFKA-14847
 URL: https://issues.apache.org/jira/browse/KAFKA-14847
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today, EOS-v2/v1 and ALOS shares the same internal callpath inside 
TaskManager/TaskExecutor for committing tasks from various scenarios, the call 
path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> 
{{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can 
be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, 
this is fine to commit just a subset of the tasks; however for EOS-v1, since 
all tasks participate in the same txn it could lead to dangerous violations, 
and today we are relying on all the callers of the commit function to make sure 
that the list of tasks they passed in, under EOS-v2, would still not violate 
the semantics. As summarized today (thanks to Matthias), today that callee 
could be triggered in the following cases:

1) Inside handleRevocation() -- this is a clean path, an we add all non-revoked 
tasks with commitNeeded() flag set to the commit -- so this seems to be fine.
2) tryCloseCleanAllActiveTasks() -- here we only call it, if 
tasksToCloseDirty.isEmpty() -- so it seems fine, too.
3) commit() with a list of task handed in -- we call commit() inside the TM 
three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as 
commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() -- here, we only consider RUNNING and RESTORING 
tasks, which are not corrupted -- note we only throw a TaskCorruptedException 
during restore state initialization, thus, corrupted tasks did not process 
anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() -- under EOS-v2, as we just commit a 
subset of tasks' source offsets while at the same time we still commit those 
unsuccessful task's outgoing records if there are any.

Just going through this list of callers itself, as demonstrated above, is 
already pretty complex, and very vulnerable to bugs. It's better to not rely on 
the callers, but the callees to make sure that's the case. More concretely, I 
think we can introduce a new function called {{commitAllTasks}} such that under 
EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are 
some tasks that should not be committed because we know they have not processed 
any data, the {{commitAllTasks}} callee itself would do some clever filtering 
internally.

Given its scope, I think it's better to do this refactoring after EOS-v1 is 
removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-20 Thread Guozhang Wang
+1, thank you Victoria!

On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
 wrote:
>
> Hi all,
>
> I'd like to start a vote on KIP-914 for updating the Kafka Streams join
> processors to use proper timestamp-based semantics in applications with
> versioned stores:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
>
> To avoid compatibility concerns, I'd like to include the changes from this
> KIP together with KIP-889
> 
> (for introducing versioned stores) in the upcoming 3.5 release. I will
> close the vote on the 3.5 KIP deadline, March 22, if there are no
> objections before then.
>
> Thanks,
> Victoria


Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-17 Thread Guozhang Wang
Thanks Matthias / Victoria, both bullet points make sense to me.

On Thu, Mar 16, 2023 at 10:39 AM Victoria Xia
 wrote:
>
> Thanks for your comments, Matthias!
>
> > For stream-table joins, I think we need to elaborate that a `get(k, ts)`
> call now might return `null` if the history retention of the store is too
> short.
>
> Great callout -- I agree we should definitely clarify this in the KIP and
> mention it in the eventual docs as well.
>
> When a call to `get(k, ts)` returns null, there's not really a good way to
> distinguish whether it's because the timestamp is outside of the store's
> history retention or if it's because there's actually no record version for
> the key at the specified timestamp. Determining this from the processor
> would require (1) exposing the store's history retention to the processor,
> and (2) reconciling the fact that state stores today (including the new
> versioned store implementation) track their own observed stream time
> separate from processor time.
>
> In light of this, I think your proposal to treat a null from `get(k, ts)`
> due to history retention having been exceeded the same as we'd treat any
> other null makes sense, and is also our only viable option right now. I'll
> call this out in the docs so users are aware that their choice of history
> retention has this implication.
>
> > For left-table-table joins, there seems to be no special impact, but it
> should be called out, too. The lookup itself does not go into the history
> of the table so no change here (as we don't have the "query older than
> history case")
>
> Yup, we're on the same page. Using a versioned store for table-table joins
> results in the semantic change that the join result will include the
> latest-by-timestamp record rather than the latest-by-offset record, but no
> timestamped lookups (i.e., `get(key, ts)` calls) are used in the process so
> there is no concern about history retention having elapsed and affecting
> join results. (The only implication of history retention for this use case
> is indirect, since history retention doubles as grace period for the store.
> Because grace period is per store instance, which has task-level
> granularity, that means if grace period is set too low then the latest
> record for one key could be dropped from the store if another key has
> already advanced the store's observed stream time past the grace period by
> the time that this record is seen.)
>
> I will update the KIP with these additional notes.
>
> Thanks,
> Victoria
>
> On Wed, Mar 15, 2023 at 7:16 PM Matthias J. Sax  wrote:
>
> > Thanks for the KIP! Great to see a first step towards using the new
> > versioned stores!
> >
> > I think the described tradeoffs make sense and I like make a pragmatic
> > step into the right direction, and avoid boiling the ocean. Thus, I
> > agree to the proposed solution.
> >
> > One minor thing, that I believe just need clarification in the KIP (does
> > not seem to be a change to the KIP itself):
> >
> > For stream-table joins, I think we need to elaborate that a `get(k, ts)`
> > call now might return `null` if the history retention of the store is
> > too short. For inner-joins it would result in no output record (ie,
> > stream input record is dropped). Would be good to have it mentioned in
> > the KIP explicitly.
> >
> > We should also discuss how left-joins should work for this case. I think
> > it's ok (better) to include the stream record in the result if the
> > lookup returns `null` -- either because no key exist in the exiting
> > history for the provided timestamp, or (the actual case in question)
> > because we query older than available history. If you agree, can we add
> > this to the KIP?
> >
> > For left-table-table joins, there seems to be no special impact, but it
> > should be called out, too. The lookup itself does not go into the
> > history of the table so no change here (as we don't have the "query
> > older than history case") -- and for out-of-order records, we just
> > "drop" them anyway, so no change for left-joins either I believe.
> >
> >
> > -Matthias
> >
> >
> >
> > On 3/15/23 2:00 PM, Guozhang Wang wrote:
> > > Sounds good to me. Thanks!
> > >
> > > On Wed, Mar 15, 2023 at 12:07 PM Victoria Xia
> > >  wrote:
> > >>
> > >> Thanks for kicking off the discussion, John and Guozhang!
> > >>
> > >>> Just one thing that might be out of scope: if users want to enable the
> > >> versioned table feature across the topology, should we allow them

Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-15 Thread Guozhang Wang
Sounds good to me. Thanks!

On Wed, Mar 15, 2023 at 12:07 PM Victoria Xia
 wrote:
>
> Thanks for kicking off the discussion, John and Guozhang!
>
> > Just one thing that might be out of scope: if users want to enable the
> versioned table feature across the topology, should we allow them to do it
> via a single config rather than changing the materialized object at each
> place?
>
> Yes, I think this would be a great usability improvement and am in favor of
> introducing such a config. As long as the config defaults to using
> unversioned stores (which makes sense anyway), there will be no
> compatibility concerns with introducing the config in a future release.
> It's out of scope for this particular KIP as a result, but can hopefully be
> introduced as part of the next release after 3.5.
>
> Best,
> Victoria
>
> On Wed, Mar 15, 2023 at 10:49 AM Guozhang Wang 
> wrote:
>
> > Thanks Victoria for the great writeup, with a thorough analysis and
> > trade-offs. I do not have any major questions about the proposal.
> >
> > Just one thing that might be out of scope: if users want to enable the
> > versioned table feature across the topology, should we allow them to
> > do it via a single config rather than changing the materialized object
> > at each place? Maybe we can defer that for future discussions, but
> > just want to hear your thoughts.
> >
> > Anyways, I think this proposal is great just as-is even if we agree to
> > do the configuration improvement later.
> >
> >
> > Guozhang
> >
> > On Thu, Mar 9, 2023 at 7:52 PM John Roesler  wrote:
> > >
> > > Thanks for the KIP, Victoria!
> > >
> > > I had some questions/concerns, but you addressed them in the Rejected
> > Alternatives section. Thanks for the thorough proposal!
> > >
> > > -John
> > >
> > > On Thu, Mar 9, 2023, at 18:59, Victoria Xia wrote:
> > > > Hi everyone,
> > > >
> > > > I have a proposal for updating Kafka Streams's stream-table join and
> > > > table-table join semantics for the new versioned key-value state stores
> > > > introduced in KIP-889
> > > > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> > >.
> > > > Would love to hear your thoughts and suggestions.
> > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> > > >
> > > > Thanks,
> > > > Victoria
> >


Re: [DISCUSS] KIP-914 Join Processor Semantics for Versioned Stores

2023-03-15 Thread Guozhang Wang
Thanks Victoria for the great writeup, with a thorough analysis and
trade-offs. I do not have any major questions about the proposal.

Just one thing that might be out of scope: if users want to enable the
versioned table feature across the topology, should we allow them to
do it via a single config rather than changing the materialized object
at each place? Maybe we can defer that for future discussions, but
just want to hear your thoughts.

Anyways, I think this proposal is great just as-is even if we agree to
do the configuration improvement later.


Guozhang

On Thu, Mar 9, 2023 at 7:52 PM John Roesler  wrote:
>
> Thanks for the KIP, Victoria!
>
> I had some questions/concerns, but you addressed them in the Rejected 
> Alternatives section. Thanks for the thorough proposal!
>
> -John
>
> On Thu, Mar 9, 2023, at 18:59, Victoria Xia wrote:
> > Hi everyone,
> >
> > I have a proposal for updating Kafka Streams's stream-table join and
> > table-table join semantics for the new versioned key-value state stores
> > introduced in KIP-889
> > .
> > Would love to hear your thoughts and suggestions.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >
> > Thanks,
> > Victoria


Re: Regarding Jira Account Creation

2023-03-14 Thread Guozhang Wang
Hi Rohit,

You can request the account creation in this url:
https://selfserve.apache.org/jira-account.html

On Tue, Mar 14, 2023 at 7:19 AM Rohit -  wrote:
>
> Hi, I am Rohit, working as a Software Engineer at Confluent. Recently I
> started working on https://issues.apache.org/jira/browse/KAFKA-14401. And
> raised a PR for the same here(https://github.com/apache/kafka/pull/13361).
> I wanted to have a JIRA account to assign the issue to myself and work on
> KAFKA-14401. Can you please help me with the creation of a JIRA account
> with username "rohits64" and email as either rohitsanja...@gmail.com or
> roh...@confluent.io.
>
> Thanks and Regards
> Rohit
> Software Engineer
> Confluent


Re: [ANNOUNCE] New Kafka PMC Member: David Arthur

2023-03-10 Thread Guozhang Wang
Congrats David!

On Fri, Mar 10, 2023 at 7:29 AM Randall Hauch  wrote:
>
> Congratulations, David!
>
> Randall
>
> On Fri, Mar 10, 2023 at 5:17 AM Viktor Somogyi-Vass
>  wrote:
>
> > Congrats David!
> >
> > On Fri, Mar 10, 2023 at 9:12 AM Tom Bentley  wrote:
> >
> > > Congratulations!
> > >
> > > On Fri, 10 Mar 2023 at 03:36, John Roesler  wrote:
> > >
> > > > Congratulations, David!
> > > > -John
> > > >
> > > > On Thu, Mar 9, 2023, at 20:18, ziming deng wrote:
> > > > > Congrats David!
> > > > >
> > > > > Ziming
> > > > >
> > > > >> On Mar 10, 2023, at 10:02, Luke Chen  wrote:
> > > > >>
> > > > >> Congratulations, David!
> > > > >>
> > > > >> On Fri, Mar 10, 2023 at 9:56 AM Yash Mayya 
> > > > wrote:
> > > > >>
> > > > >>> Congrats David!
> > > > >>>
> > > > >>> On Thu, Mar 9, 2023, 23:42 Jun Rao 
> > wrote:
> > > > >>>
> > > >  Hi, Everyone,
> > > > 
> > > >  David Arthur has been a Kafka committer since 2013. He has been
> > very
> > > >  instrumental to the community since becoming a committer. It's my
> > > > >>> pleasure
> > > >  to announce that David is now a member of Kafka PMC.
> > > > 
> > > >  Congratulations David!
> > > > 
> > > >  Jun
> > > >  on behalf of Apache Kafka PMC
> > > > 
> > > > >>>
> > > >
> > > >
> > >
> >


Re: [ANNOUNCE] New Kafka PMC Member: Chris Egerton

2023-03-10 Thread Guozhang Wang
Congrats Chris!

On Fri, Mar 10, 2023 at 8:41 AM Jeremy Custenborder
 wrote:
>
> Congrats buddy!
>
> On Fri, Mar 10, 2023 at 9:28 AM Randall Hauch  wrote:
> >
> > Congratulations, Chris!
> >
> > Randall
> >
> > On Fri, Mar 10, 2023 at 9:07 AM David Arthur
> >  wrote:
> >
> > > Congrats, Chris!
> > >
> > > On Fri, Mar 10, 2023 at 8:27 AM Matthew Benedict de Detrich
> > >  wrote:
> > >
> > > > Congrats, well deserved!
> > > >
> > > > On Thu, 9 Mar 2023, 19:12 Jun Rao,  wrote:
> > > >
> > > > > Hi, Everyone,
> > > > >
> > > > > Chris Egerton has been a Kafka committer since July 2022. He has been
> > > > very
> > > > > instrumental to the community since becoming a committer. It's my
> > > > pleasure
> > > > > to announce that Chris is now a member of Kafka PMC.
> > > > >
> > > > > Congratulations Chris!
> > > > >
> > > > > Jun
> > > > > on behalf of Apache Kafka PMC
> > > > >
> > > >
> > >
> > >
> > > --
> > > -David
> > >


[jira] [Resolved] (KAFKA-12639) AbstractCoordinator ignores backoff timeout when joining the consumer group

2023-02-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12639.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> AbstractCoordinator ignores backoff timeout when joining the consumer group
> ---
>
> Key: KAFKA-12639
> URL: https://issues.apache.org/jira/browse/KAFKA-12639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.7.0
>Reporter: Matiss Gutmanis
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> We observed heavy logging while trying to join consumer group during partial 
> unavailability of Kafka cluster (it's part of our testing process). Seems 
> that {{rebalanceConfig.retryBackoffMs}} used in  {{ 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeeded}}
>  is not respected. Debugging revealed that {{Timer}} instance technically is 
> expired thus using sleep of 0 milliseconds which defeats the purpose of 
> backoff timeout.
> Minimal backoff timeout should be respected.
>  
> {code:java}
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,492 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-907: Add Boolean Serde to public interface

2023-02-27 Thread Guozhang Wang
+1, thanks!

On Fri, Feb 24, 2023 at 5:43 AM SpacRocket 
wrote:

> Hi Everyone,
>
> I'd like to call for a vote on KIP-907, which proposes new public classes
> to the package org.apache.kafka.common.serialization:
> - BooleanSerde
> - BooleanSerializer
> - BooleanDeserializer
>
> KIP:
> lists.apache.org
> 
> 
> 
> 
> 
>
> - Jakub
>


Re: [VOTE] KIP-904: Kafka Streams - Guarantee subtractor is called before adder if key has not changed

2023-02-27 Thread Guozhang Wang
+1.

On Sun, Feb 26, 2023 at 4:27 PM Fq Public  wrote:
>
> Hi everyone,
>
> I'd like to start the vote on KIP-904: Kafka Streams - Guarantee subtractor
> is called before adder if key has not changed.
> The KIP is available here: https://cwiki.apache.org/confluence/x/P5VbDg
> The easiest way to view the entire discussion thread is via this search
> link: https://lists.apache.org/list?dev@kafka.apache.org:lte=1M:KIP-904
> Please take a look and vote.
>
> Thank you,
> Farooq


Re: [VOTE] KIP-890: Transactions Server Side Defense

2023-02-23 Thread Guozhang Wang
Thanks Justine. I checked the diff between the two versions on wiki,
seems the major changes are:

1) Move the `verifyOnly` field of the request into each transaction
and hence we no longer have any top-level primitive fields.
2) Add a top-level `errorCode` field in the response.

Is that summary right?


Guozhang

On Wed, Feb 22, 2023 at 4:51 PM Justine Olshan
 wrote:
>
> Hey all,
>
> I've updated the KIP to slightly change some of the request and response
> specs for AddPartitionsToTxn. Nothing huge, but some points came up during
> PR review.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
>
> Thanks,
> Justine
>
> On Fri, Feb 3, 2023 at 8:40 AM Justine Olshan  wrote:
>
> > Thanks everyone! I'm going to close the vote.
> > The KIP is accepted with five binding votes from Jason, Guozhang,
> > Matthias, David (and me), and two non-binding votes from Colt and Artem.
> >
> > Thanks again,
> > Justine
> >
> > On Thu, Feb 2, 2023 at 11:41 PM David Jacot 
> > wrote:
> >
> >> Thanks for the KIP, Justine. +1 (binding)
> >>
> >> On Fri, Feb 3, 2023 at 1:36 AM Matthias J. Sax  wrote:
> >>
> >> > Thanks for the KIP!
> >> >
> >> > +1 (binding)
> >> >
> >> >
> >> > On 2/2/23 4:18 PM, Artem Livshits wrote:
> >> > > (non-binding) +1.  Looking forward to the implementation and fixing
> >> the
> >> > > issues that we've got.
> >> > >
> >> > > -Artem
> >> > >
> >> > > On Mon, Jan 23, 2023 at 2:25 PM Guozhang Wang <
> >> > guozhang.wang...@gmail.com>
> >> > > wrote:
> >> > >
> >> > >> Thanks Justine, I have no further comments on the KIP. +1.
> >> > >>
> >> > >> On Tue, Jan 17, 2023 at 10:34 AM Jason Gustafson
> >> > >>  wrote:
> >> > >>>
> >> > >>> +1. Thanks Justine!
> >> > >>>
> >> > >>> -Jason
> >> > >>>
> >> > >>> On Tue, Jan 10, 2023 at 3:46 PM Colt McNealy 
> >> > >> wrote:
> >> > >>>
> >> > >>>> (non-binding) +1. Thank you for the KIP, Justine! I've read it; it
> >> > >> makes
> >> > >>>> sense to me and I am excited for the implementation.
> >> > >>>>
> >> > >>>> Colt McNealy
> >> > >>>> *Founder, LittleHorse.io*
> >> > >>>>
> >> > >>>>
> >> > >>>> On Tue, Jan 10, 2023 at 10:46 AM Justine Olshan
> >> > >>>>  wrote:
> >> > >>>>
> >> > >>>>> Hi everyone,
> >> > >>>>>
> >> > >>>>> I would like to start a vote on KIP-890 which aims to prevent some
> >> > >> of the
> >> > >>>>> common causes of hanging transactions and make other general
> >> > >> improvements
> >> > >>>>> to transactions in Kafka.
> >> > >>>>>
> >> > >>>>>
> >> > >>>>>
> >> > >>>>
> >> > >>
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
> >> > >>>>>
> >> > >>>>> Please take a look if you haven't already and vote!
> >> > >>>>>
> >> > >>>>> Justine
> >> > >>>>>
> >> > >>>>
> >> > >>
> >> > >
> >> >
> >>
> >


[jira] [Resolved] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs

2023-02-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-14253.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> StreamsPartitionAssignor should print the member count in assignment logs
> -
>
> Key: KAFKA-14253
> URL: https://issues.apache.org/jira/browse/KAFKA-14253
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Christopher Pooya Razavian
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.5.0
>
>
> Debugging rebalance and assignment issues is harder than it needs to be. One 
> simple thing that can help is to print out information in the logs that users 
> have to compute today.
> For example, the StreamsPartitionAssignor prints two messages that contain 
> the the newline-delimited group membership:
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] All members participating in this rebalance:
> : []
> : []
> : []{code}
> and
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] 
> to clients as:
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])
> {code}
>  
> In both of these cases, it would be nice to:
>  # Include the number of members in the group (I.e., "15 members 
> participating" and "to 15 clients as")
>  # sort the member ids (to help compare the membership and assignment across 
> rebalances)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New committer: Lucas Bradstreet

2023-02-16 Thread Guozhang Wang
Congratulations, Lucas!

On Thu, Feb 16, 2023 at 3:18 PM Kowshik Prakasam  wrote:
>
> Congratulations, Lucas!
>
>
> Cheers,
> Kowshik
>
> On Thu, Feb 16, 2023, 2:07 PM Justine Olshan 
> wrote:
>
> > Congratulations Lucas!
> >
> > Thanks for your mentorship on some of my KIPs as well :)
> >
> > On Thu, Feb 16, 2023 at 1:56 PM Jun Rao  wrote:
> >
> > > Hi, Everyone,
> > >
> > > The PMC of Apache Kafka is pleased to announce a new Kafka committer
> > Lucas
> > > Bradstreet.
> > >
> > > Lucas has been a long time Kafka contributor since Oct. 2018. He has been
> > > extremely valuable for Kafka on both performance and correctness
> > > improvements.
> > >
> > > The following are his performance related contributions.
> > >
> > > KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates batch
> > > iterator which is not used
> > > KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer
> > > KAFKA-9729: avoid readLock in authorizer ACL lookups
> > > KAFKA-9039: Optimize ReplicaFetcher fetch path
> > > KAFKA-8841: Reduce overhead of ReplicaManager.updateFollowerFetchState
> > >
> > > The following are his correctness related contributions.
> > >
> > > KAFKA-13194: LogCleaner may clean past highwatermark
> > > KAFKA-10432: LeaderEpochCache is incorrectly recovered on segment
> > recovery
> > > for epoch 0
> > > KAFKA-9137: Fix incorrect FetchSessionCache eviction logic
> > >
> > > Congratulations, Lucas!
> > >
> > > Thanks,
> > >
> > > Jun (on behalf of the Apache Kafka PMC)
> > >
> >


Re: Requesting permissions to contribute to Apache Kafka.

2023-02-15 Thread Guozhang Wang
Hello Zhangheng,

I've granted your permission on the Apache Kafka wiki space. You
should be able to create new KIP pages.


Guozhang

On Wed, Feb 15, 2023 at 6:02 AM hzh0425  wrote:
>
> Hello,
>
> My name is zhangheng huang. I want to start contributing to the Apache Kafka 
> project and want the permission to create KIPs
>
> JiraID: hzh0425
> WikiID: hzh0425
> email: hzh0...@apache.org
>
> Best regards,


Re: Re: [DISCUSS] KIP-904 Kafka Streams - Guarantee subtractor is called before adder if key has not changed

2023-02-14 Thread Guozhang Wang
Thanks Farooq, that looks good to me.

Guozhang

On Sun, Feb 12, 2023 at 9:01 AM Dharin Shah  wrote:
>
> Hello Farooq,
>
> This is actually a great idea, we have dealt with this by using an array
> instead of a set.
> +1 to this :)
>
> Thank you,
> Dharin
>
> On Sun, Feb 12, 2023 at 8:11 PM Fq Public  wrote:
>
> > Hi Guozhang,
> >
> > Thanks for reading over my proposal!
> >
> > > Regarding the format, I'm just thinking if we can change the type of
> > "INT newDataLength" to UINT32?
> >
> > Good idea, I've updated the KIP to reflect UINT32 since it makes clear the
> > value can never be less than zero.
> >
> > > `.equals` default implementation on Object is by reference, so if the
> > groupBy did not generate a new object, that may still pass. This means that
> > even if user does not implement the `.equals` function, if the same object
> > is returned then this feature would still be triggered, is that correct?
> >
> > Correct, I've updated the KIP to call out this edge-case clearly as
> > follows:
> >
> > > Since the default `.equals` implementation for an `Object`  is by
> > reference, if a user's `groupBy` returns the same reference for the key,
> > then the oldKey and the newKey will naturally `.equals`  each other. This
> > will result in a single event being sent to the repartition topic. This
> > change in behaviour should be considered a "bug-fix" rather than a
> > "breaking change" as the semantics of the operation remain unchanged, the
> > only thing that changes for users is they no longer see transient
> > "inconsistent" states.  In the worst case, users in this situation will
> > need to update any strict tests that check specifically for the presence of
> > transient "inconsistent" states.
> >
> > What do you think?
> >
> > Thanks,
> > Farooq
> >
> > On 2023/02/07 18:02:24 Guozhang Wang wrote:
> > > Hello Farooq,
> > >
> > > Thanks for the very detailed proposal! I think this is a great idea.
> > > Just a few thoughts:
> > >
> > > 1. I regret that we over-optimized the Changed serde format for
> > > footprint while making it less extensible. It seems to me that a two
> > > rolling bounce migration is unavoidable.. Regarding the format, I'm
> > > just thinking if we can change the type of "INT newDataLength" to
> > > UINT32?
> > >
> > > 2. `.equals` default implementation on Object is by reference, so if
> > > the groupBy did not generate a new object, that may still pass. This
> > > means that even if user does not implement the `.equals` function, if
> > > the same object is returned then this feature would still be
> > > triggered, is that correct?
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Feb 6, 2023 at 5:05 AM Fq Public  wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > I'd like to share a new KIP for discussion:
> > > > https://cwiki.apache.org/confluence/x/P5VbDg
> > > >
> > > > This could be considered mostly as a "bug fix" but we wanted to raise
> > a KIP
> > > > for discussion because it involves changes to the serialization format
> > of
> > > > an internal topic which raises backward compatibility considerations.
> > > >
> > > > Please take a look and let me know what you think.
> > > >
> > > > Thanks,
> > > > Farooq
> > >


Re: [DISCUSS] KIP-904 Kafka Streams - Guarantee subtractor is called before adder if key has not changed

2023-02-07 Thread Guozhang Wang
Hello Farooq,

Thanks for the very detailed proposal! I think this is a great idea.
Just a few thoughts:

1. I regret that we over-optimized the Changed serde format for
footprint while making it less extensible. It seems to me that a two
rolling bounce migration is unavoidable.. Regarding the format, I'm
just thinking if we can change the type of "INT newDataLength" to
UINT32?

2. `.equals` default implementation on Object is by reference, so if
the groupBy did not generate a new object, that may still pass. This
means that even if user does not implement the `.equals` function, if
the same object is returned then this feature would still be
triggered, is that correct?


Guozhang

On Mon, Feb 6, 2023 at 5:05 AM Fq Public  wrote:
>
> Hi everyone,
>
> I'd like to share a new KIP for discussion:
> https://cwiki.apache.org/confluence/x/P5VbDg
>
> This could be considered mostly as a "bug fix" but we wanted to raise a KIP
> for discussion because it involves changes to the serialization format of
> an internal topic which raises backward compatibility considerations.
>
> Please take a look and let me know what you think.
>
> Thanks,
> Farooq


Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2023-01-26 Thread Guozhang Wang
Thanks for all the very helpful discussions, I'm closing the vote with
a tally here:

+1: 7 (Nick, John, Walker, Bruno, Lucas, Matthias, Guozhang), with 5
binding votes and 2 non-binding votes.
-1: 0


Guozhang

On Wed, Jan 25, 2023 at 5:48 PM Matthias J. Sax  wrote:
>
> Thanks!
>
> +1 (binding)
>
> -Matthias
>
> On 1/24/23 1:17 PM, Guozhang Wang wrote:
> > Hi Matthias:
> >
> > re "paused" -> "suspended": I got your point now, thanks. Just to
> > clarify the two functions are a bit different: "paused" tasks are
> > because of the topology being paused, i.e. from KIP-834; whereas
> > "suspended" tasks are when a restoring tasks are being removed before
> > it completes due to a follow-up rebalance, and this is to distinguish
> > with "onRestoreEnd", as described in KAFKA-10575. A suspended task is
> > no longer owned by the thread and hence there's no need to measure the
> > number of such tasks.
> >
> > re: "restore-ratio": that's a good point. I like it to function in the
> > same way as the "records-rate" metrics. Will update the wiki.
> >
> > re: making "restore-remaining-records-total" at INFO level: sounds
> > good to me too. I will also update the metric name a bit to be more
> > specific.
> >
> >
> >
> > On Thu, Jan 19, 2023 at 2:35 PM Guozhang Wang
> >  wrote:
> >>
> >> Hello Matthias,
> >>
> >> Thanks for the feedback. I was on vacation for a while. Pardon for the
> >> late replies. Please see them inline below
> >>
> >> On Thu, Dec 1, 2022 at 11:23 PM Matthias J. Sax  wrote:
> >>>
> >>> Seems I am late to the party... Great KIP. Couple of questions from my 
> >>> side:
> >>>
> >>> (1) What is the purpose of `standby-updating-tasks`? It seems to be the
> >>> same as the number of assigned standby task? Not sure how useful it
> >>> would be?
> >>>
> >> In general, yes, it is the number of assigned standby tasks --- there
> >> will be transit times when the assigned standby tasks are not yet
> >> being updated but it would not last long --- but we do not yet have a
> >> direct gauge to expose this before, and users have to infer this from
> >> other indirect metrics.
> >>
> >>>
> >>>
> >>> (2) `active-paused-tasks` / `standby-paused-tasks` -- what does "paused"
> >>> exactly mean? There was a discussion about renaming the callback method
> >>> from pause to suspended. So should this be called `suspended`, too? And
> >>> if yes, how is it useful for users?
> >>>
> >> Pausing here refers to "KIP-834: Pause / Resume KafkaStreams
> >> Topologies" 
> >> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832).
> >> When a topology is paused, all its tasks including standbys will be
> >> paused too.
> >>
> >> I'm not aware of a discussion to rename the call name to "suspend" for
> >> KIP-834. Could you point me to the reference?
> >>
> >>>
> >>>
> >>> (3) `restore-ratio`: the description says
> >>>
> >>>> The fraction of time the thread spent on restoring active or standby 
> >>>> tasks
> >>>
> >>> I find the term "restoring" does only apply to active tasks, but not to
> >>> standbys. Can we reword this?
> >>>
> >> Yeah I have been discussing this with others in the community a bit as
> >> well, but so far I have not been convinced of a better name than it.
> >> Some other alternatives being discussed but not win everyone's love is
> >> "restore-or-update-ratio", "process-ratio" (for the restore thread
> >> that means restoring or updating), and "io-ratio".
> >>
> >> The only one so far that I feel is probably better, is
> >> "state-update-ratio". If folks feel this one is better than
> >> "restore-ratio" I'm happy to update.
> >>
> >>>
> >>> (4) `restore-call-rate`: not sure what you exactly mean by "restore 
> >>> calls"?
> >>>
> >> This is similar to the "io-calls-rate" in the selector classes, i.e.
> >> the number of "restore" function calls made. It's argurably a very
> >> low-level metrics but I included it since it could be useful in some
> >> debugging 

Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2023-01-24 Thread Guozhang Wang
Hi Matthias:

re "paused" -> "suspended": I got your point now, thanks. Just to
clarify the two functions are a bit different: "paused" tasks are
because of the topology being paused, i.e. from KIP-834; whereas
"suspended" tasks are when a restoring tasks are being removed before
it completes due to a follow-up rebalance, and this is to distinguish
with "onRestoreEnd", as described in KAFKA-10575. A suspended task is
no longer owned by the thread and hence there's no need to measure the
number of such tasks.

re: "restore-ratio": that's a good point. I like it to function in the
same way as the "records-rate" metrics. Will update the wiki.

re: making "restore-remaining-records-total" at INFO level: sounds
good to me too. I will also update the metric name a bit to be more
specific.



On Thu, Jan 19, 2023 at 2:35 PM Guozhang Wang
 wrote:
>
> Hello Matthias,
>
> Thanks for the feedback. I was on vacation for a while. Pardon for the
> late replies. Please see them inline below
>
> On Thu, Dec 1, 2022 at 11:23 PM Matthias J. Sax  wrote:
> >
> > Seems I am late to the party... Great KIP. Couple of questions from my side:
> >
> > (1) What is the purpose of `standby-updating-tasks`? It seems to be the
> > same as the number of assigned standby task? Not sure how useful it
> > would be?
> >
> In general, yes, it is the number of assigned standby tasks --- there
> will be transit times when the assigned standby tasks are not yet
> being updated but it would not last long --- but we do not yet have a
> direct gauge to expose this before, and users have to infer this from
> other indirect metrics.
>
> >
> >
> > (2) `active-paused-tasks` / `standby-paused-tasks` -- what does "paused"
> > exactly mean? There was a discussion about renaming the callback method
> > from pause to suspended. So should this be called `suspended`, too? And
> > if yes, how is it useful for users?
> >
> Pausing here refers to "KIP-834: Pause / Resume KafkaStreams
> Topologies" 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832).
> When a topology is paused, all its tasks including standbys will be
> paused too.
>
> I'm not aware of a discussion to rename the call name to "suspend" for
> KIP-834. Could you point me to the reference?
>
> >
> >
> > (3) `restore-ratio`: the description says
> >
> > > The fraction of time the thread spent on restoring active or standby tasks
> >
> > I find the term "restoring" does only apply to active tasks, but not to
> > standbys. Can we reword this?
> >
> Yeah I have been discussing this with others in the community a bit as
> well, but so far I have not been convinced of a better name than it.
> Some other alternatives being discussed but not win everyone's love is
> "restore-or-update-ratio", "process-ratio" (for the restore thread
> that means restoring or updating), and "io-ratio".
>
> The only one so far that I feel is probably better, is
> "state-update-ratio". If folks feel this one is better than
> "restore-ratio" I'm happy to update.
>
> >
> > (4) `restore-call-rate`: not sure what you exactly mean by "restore calls"?
> >
> This is similar to the "io-calls-rate" in the selector classes, i.e.
> the number of "restore" function calls made. It's argurably a very
> low-level metrics but I included it since it could be useful in some
> debugging scenarios.
>
> >
> > (5) `restore-remaining-records-total` -- why is this a task metric?
> > Seems we could roll it up into a thread metric that we report at INFO
> > level (we could still have per-task DEBUG level metric for it in addition).
> >
> The rationale behind it is the general principle in metrics design
> that "Kafka would provide the lowest necessary metrics levels, and
> users can do the roll-ups however they want".
>
> >
> > (6) What about "warmup tasks"? Internally, we treat them as standbys,
> > but it seems it's hard for users to reason about it in the scale-out
> > warm-up case. Would it be helpful (and possible) to report "warmup
> > progress" explicitly?
> >
> At the restore thread level, we cannot differentiate standby tasks
> from warmup tasks since the latter is created exactly just like the
> former. But I do agree this is an issue for visibility that worth
> addressing, I think another KIP would be needed to first consider
> distinguishing these two at the class level.
>
> >
> > -Matthias
> 

Re: [VOTE] KIP-890: Transactions Server Side Defense

2023-01-23 Thread Guozhang Wang
Thanks Justine, I have no further comments on the KIP. +1.

On Tue, Jan 17, 2023 at 10:34 AM Jason Gustafson
 wrote:
>
> +1. Thanks Justine!
>
> -Jason
>
> On Tue, Jan 10, 2023 at 3:46 PM Colt McNealy  wrote:
>
> > (non-binding) +1. Thank you for the KIP, Justine! I've read it; it makes
> > sense to me and I am excited for the implementation.
> >
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
> >
> > On Tue, Jan 10, 2023 at 10:46 AM Justine Olshan
> >  wrote:
> >
> > > Hi everyone,
> > >
> > > I would like to start a vote on KIP-890 which aims to prevent some of the
> > > common causes of hanging transactions and make other general improvements
> > > to transactions in Kafka.
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
> > >
> > > Please take a look if you haven't already and vote!
> > >
> > > Justine
> > >
> >


Re: [DISCUSS] KIP-890 Server Side Defense

2023-01-20 Thread Guozhang Wang
xcept for the
> fact that it hasn't been returned on Produce requests before. The error
> handling for clients is a bit vague (which is why I opened KAFKA-14439
> <https://issues.apache.org/jira/browse/KAFKA-14439>), but the decision we
> made here was to only return errors that have been previously returned to
> producers. As for not being fatal, I think part of the theory was that in
> many cases, the producer would be disconnected. (See point 1) and this
> would just be an error to return from the server. I did plan to think about
> other cases, so let me know if you think of any as well!
>
> Lots to say! Let me know if you have further thoughts!
> Justine
>
> On Fri, Jan 20, 2023 at 11:21 AM Guozhang Wang 
> wrote:
>
> > Hello Justine,
> >
> > Thanks for the great write-up! I made a quick pass through it and here
> > are some thoughts (I have not been able to read through this thread so
> > pardon me if they have overlapped or subsumed by previous comments):
> >
> > First are some meta ones:
> >
> > 1. I think we need to also improve the client's experience once we
> > have this defence in place. More concretely, say a user's producer
> > code is like following:
> >
> > future = producer.send();
> > // producer.flush();
> > producer.commitTransaction();
> > future.get();
> >
> > Which resulted in the order of a) produce-request sent by producer, b)
> > end-txn-request sent by producer, c) end-txn-response sent back, d)
> > txn-marker-request sent from coordinator to partition leader, e)
> > produce-request finally received by the partition leader, before this
> > KIP e) step would be accepted causing a dangling txn; now it would be
> > rejected in step e) which is good. But from the client's point of view
> > now it becomes confusing since the `commitTransaction()` returns
> > successfully, but the "future" throws an invalid-epoch error, and they
> > are not sure if the transaction did succeed or not. In fact, it
> > "partially succeeded" with some msgs being rejected but others
> > committed successfully.
> >
> > Of course the easy way to avoid this is, always call
> > "producer.flush()" before commitTxn and that's what we do ourselves,
> > and what we recommend users do. But I suspect not everyone does it. In
> > fact I just checked the javadoc in KafkaProducer and our code snippet
> > does not include a `flush()` call. So I'm thinking maybe we can in
> > side the `commitTxn` code to enforce flushing before sending the
> > end-txn request.
> >
> > 2. I'd like to clarify a bit details on "just add partitions to the
> > transaction on the first produce request during a transaction". My
> > understanding is that the partition leader's cache has the producer id
> > / sequence / epoch for the latest txn, either on-going or is completed
> > (upon receiving the marker request from coordinator). When a produce
> > request is received, if
> >
> > * producer's epoch < cached epoch, or producer's epoch == cached epoch
> > but the latest txn is completed, leader directly reject with
> > invalid-epoch.
> > * producer's epoch > cached epoch, park the the request and send
> > add-partitions request to coordinator.
> >
> > In order to do it, does the coordinator need to bump the sequence and
> > reset epoch to 0 when the next epoch is going to overflow? If no need
> > to do so, then how we handle the (admittedly rare, but still may
> > happen) epoch overflow situation?
> >
> > 3. I'm a bit concerned about adding a generic "ABORTABLE_ERROR" given
> > we already have a pretty messy error classification and error handling
> > on the producer clients side --- I have a summary about the issues and
> > a proposal to address this in
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
> > -- I understand we do not want to use "UNKNOWN_PRODUCER_ID" anymore
> > and in fact we intend to deprecate it in KIP-360 and eventually remove
> > it; but I'm wondering can we still use specific error codes. E.g. what
> > about "InvalidProducerEpochException" since for new clients, the
> > actual reason this would actually be rejected is indeed because the
> > epoch on the coordinator caused the add-partitions-request from the
> > brokers to be rejected anyways?
> >
> > 4. It seems we put the producer request into purgatory before we ever
> > append the records, while other producer's records may still be
> > appended durin

Re: [DISCUSS] KIP-890 Server Side Defense

2023-01-20 Thread Guozhang Wang
Hello Justine,

Thanks for the great write-up! I made a quick pass through it and here
are some thoughts (I have not been able to read through this thread so
pardon me if they have overlapped or subsumed by previous comments):

First are some meta ones:

1. I think we need to also improve the client's experience once we
have this defence in place. More concretely, say a user's producer
code is like following:

future = producer.send();
// producer.flush();
producer.commitTransaction();
future.get();

Which resulted in the order of a) produce-request sent by producer, b)
end-txn-request sent by producer, c) end-txn-response sent back, d)
txn-marker-request sent from coordinator to partition leader, e)
produce-request finally received by the partition leader, before this
KIP e) step would be accepted causing a dangling txn; now it would be
rejected in step e) which is good. But from the client's point of view
now it becomes confusing since the `commitTransaction()` returns
successfully, but the "future" throws an invalid-epoch error, and they
are not sure if the transaction did succeed or not. In fact, it
"partially succeeded" with some msgs being rejected but others
committed successfully.

Of course the easy way to avoid this is, always call
"producer.flush()" before commitTxn and that's what we do ourselves,
and what we recommend users do. But I suspect not everyone does it. In
fact I just checked the javadoc in KafkaProducer and our code snippet
does not include a `flush()` call. So I'm thinking maybe we can in
side the `commitTxn` code to enforce flushing before sending the
end-txn request.

2. I'd like to clarify a bit details on "just add partitions to the
transaction on the first produce request during a transaction". My
understanding is that the partition leader's cache has the producer id
/ sequence / epoch for the latest txn, either on-going or is completed
(upon receiving the marker request from coordinator). When a produce
request is received, if

* producer's epoch < cached epoch, or producer's epoch == cached epoch
but the latest txn is completed, leader directly reject with
invalid-epoch.
* producer's epoch > cached epoch, park the the request and send
add-partitions request to coordinator.

In order to do it, does the coordinator need to bump the sequence and
reset epoch to 0 when the next epoch is going to overflow? If no need
to do so, then how we handle the (admittedly rare, but still may
happen) epoch overflow situation?

3. I'm a bit concerned about adding a generic "ABORTABLE_ERROR" given
we already have a pretty messy error classification and error handling
on the producer clients side --- I have a summary about the issues and
a proposal to address this in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
-- I understand we do not want to use "UNKNOWN_PRODUCER_ID" anymore
and in fact we intend to deprecate it in KIP-360 and eventually remove
it; but I'm wondering can we still use specific error codes. E.g. what
about "InvalidProducerEpochException" since for new clients, the
actual reason this would actually be rejected is indeed because the
epoch on the coordinator caused the add-partitions-request from the
brokers to be rejected anyways?

4. It seems we put the producer request into purgatory before we ever
append the records, while other producer's records may still be
appended during the time; and that potentially may result in some
re-ordering compared with reception order. I'm not super concerned
about it since Kafka does not guarantee reception ordering across
producers anyways, but it may make the timestamps of records inside a
partition to be more out-of-ordered. Are we aware of any scenarios
such as future enhancements on log compactions that may be affected by
this effect?

Below are just minor comments:

5. In "AddPartitionsToTxnTransaction" field of
"AddPartitionsToTxnRequest" RPC, the versions of those inner fields
are "0-3" while I thought they should be "0+" still?

6. Regarding "we can place the request in a purgatory of sorts and
check if there is any state for the transaction on the broker": i
think at this time when we just do the checks against the cached
state, we do not need to put the request to purgatory yet?

7. This is related to 3) above. I feel using "InvalidRecordException"
for older clients may also be a bit confusing, and also it is not
fatal -- for old clients, it better to be fatal since this indicates
the clients is doing something wrong and hence it should be closed.
And in general I'd prefer to use slightly more specific meaning error
codes for clients. That being said, I also feel
"InvalidProducerEpochException" is not suitable for old versioned
clients, and we'd have to pick one that old clients recognize. I'd
prefer "InvalidTxnStateException" but that one is supposed to be
returned from txn coordinators only today. I'd suggest we do a quick
check in the current client's code path and 

[jira] [Created] (KAFKA-14641) Cleanup CommitNeeded after EOS-V1 is removed

2023-01-19 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14641:
-

 Summary: Cleanup CommitNeeded after EOS-V1 is removed
 Key: KAFKA-14641
 URL: https://issues.apache.org/jira/browse/KAFKA-14641
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


This is a follow-up of KAFKA-14294.

Today we have several flags to determine if KS need to execute a commit: 1) 
task-level "commitNeeded" which is set whenever process() or punctuator() is 
called, 2) if there are input topic offsets to commit, retrieved from the 
"task.prepareCommit()", 3) the "transactionInFlight" flag from producer as a 
fix of KAFKA-14294 (this subsumes the first "commitNeeded" functionality).

Given that we are still having EOS-v1, cleanup this would be a bit complex. But 
after the deprecated EOS-V1 is removed, we can cleanup those controls since for 
any commit cases, we would need to commit all tasks anyways whereas in EOS-v1, 
we would commit probably a subset of tasks since they are done by different 
producers and hence different txns.

A quick thought is the following:

1) We would not need the per-task "commitNeeded" anymore.
2) We would maintain a single "commitNeeded" flag on the task-executor, hence 
on the thread level. It is set whenever `process()` or `punctuator` is called.
3) Whenever we need to commit, either a) periodically, b) upon revocation, c) 
upon user request, we simply check that flag, and if necessary commit all tasks 
and reset the flag.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-869: Improve Streams State Restoration Visibility

2023-01-19 Thread Guozhang Wang
Hello Matthias,

Thanks for the feedback. I was on vacation for a while. Pardon for the
late replies. Please see them inline below

On Thu, Dec 1, 2022 at 11:23 PM Matthias J. Sax  wrote:
>
> Seems I am late to the party... Great KIP. Couple of questions from my side:
>
> (1) What is the purpose of `standby-updating-tasks`? It seems to be the
> same as the number of assigned standby task? Not sure how useful it
> would be?
>
In general, yes, it is the number of assigned standby tasks --- there
will be transit times when the assigned standby tasks are not yet
being updated but it would not last long --- but we do not yet have a
direct gauge to expose this before, and users have to infer this from
other indirect metrics.

>
>
> (2) `active-paused-tasks` / `standby-paused-tasks` -- what does "paused"
> exactly mean? There was a discussion about renaming the callback method
> from pause to suspended. So should this be called `suspended`, too? And
> if yes, how is it useful for users?
>
Pausing here refers to "KIP-834: Pause / Resume KafkaStreams
Topologies" 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832).
When a topology is paused, all its tasks including standbys will be
paused too.

I'm not aware of a discussion to rename the call name to "suspend" for
KIP-834. Could you point me to the reference?

>
>
> (3) `restore-ratio`: the description says
>
> > The fraction of time the thread spent on restoring active or standby tasks
>
> I find the term "restoring" does only apply to active tasks, but not to
> standbys. Can we reword this?
>
Yeah I have been discussing this with others in the community a bit as
well, but so far I have not been convinced of a better name than it.
Some other alternatives being discussed but not win everyone's love is
"restore-or-update-ratio", "process-ratio" (for the restore thread
that means restoring or updating), and "io-ratio".

The only one so far that I feel is probably better, is
"state-update-ratio". If folks feel this one is better than
"restore-ratio" I'm happy to update.

>
> (4) `restore-call-rate`: not sure what you exactly mean by "restore calls"?
>
This is similar to the "io-calls-rate" in the selector classes, i.e.
the number of "restore" function calls made. It's argurably a very
low-level metrics but I included it since it could be useful in some
debugging scenarios.

>
> (5) `restore-remaining-records-total` -- why is this a task metric?
> Seems we could roll it up into a thread metric that we report at INFO
> level (we could still have per-task DEBUG level metric for it in addition).
>
The rationale behind it is the general principle in metrics design
that "Kafka would provide the lowest necessary metrics levels, and
users can do the roll-ups however they want".

>
> (6) What about "warmup tasks"? Internally, we treat them as standbys,
> but it seems it's hard for users to reason about it in the scale-out
> warm-up case. Would it be helpful (and possible) to report "warmup
> progress" explicitly?
>
At the restore thread level, we cannot differentiate standby tasks
from warmup tasks since the latter is created exactly just like the
former. But I do agree this is an issue for visibility that worth
addressing, I think another KIP would be needed to first consider
distinguishing these two at the class level.

>
> -Matthias
>
>
> On 11/1/22 2:44 AM, Lucas Brutschy wrote:
> > We need this!
> >
> > + 1 non binding
> >
> > Cheers,
> > Lucas
> >
> > On Tue, Nov 1, 2022 at 10:01 AM Bruno Cadonna  wrote:
> >>
> >> Guozhang,
> >>
> >> Thanks for the KIP!
> >>
> >> +1 (binding)
> >>
> >> Best,
> >> Bruno
> >>
> >> On 25.10.22 22:07, Walker Carlson wrote:
> >>> +1 non binding
> >>>
> >>> Thanks for the kip!
> >>>
> >>> On Thu, Oct 20, 2022 at 10:25 PM John Roesler  wrote:
> >>>
> >>>> Thanks for the KIP, Guozhang!
> >>>>
> >>>> I'm +1 (binding)
> >>>>
> >>>> -John
> >>>>
> >>>> On Wed, Oct 12, 2022, at 16:36, Nick Telford wrote:
> >>>>> Can't wait!
> >>>>> +1 (non-binding)
> >>>>>
> >>>>> On Wed, 12 Oct 2022, 18:02 Guozhang Wang, 
> >>>>> wrote:
> >>>>>
> >>>>>> Hello all,
> >>>>>>
> >>>>>> I'd like to start a vote for the following KIP, aiming to improve Kafka
> >>>>>> Stream's restoration visibility via new metrics and callback methods:
> >>>>>>
> >>>>>>
> >>>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility
> >>>>>>
> >>>>>>
> >>>>>> Thanks!
> >>>>>> -- Guozhang
> >>>>>>
> >>>>
> >>>


Re: [ANNOUNCE] New committer: Josep Prat

2023-01-17 Thread Guozhang Wang
Congratulations, Josep!

On Tue, Jan 3, 2023 at 11:23 AM Josep Prat 
wrote:

> Thanks all again! :)
>
> On Tue, Jan 3, 2023 at 6:19 PM Bill Bejeck 
> wrote:
>
> > Congratulations, Josep!
> >
> > -Bill
> >
> > On Tue, Dec 20, 2022 at 9:03 PM Luke Chen  wrote:
> >
> > > Congratulations, Josep!
> > >
> > > Luke
> > >
> > > On Wed, Dec 21, 2022 at 6:26 AM Viktor Somogyi-Vass
> > >  wrote:
> > >
> > > > Congrats Josep!
> > > >
> > > > On Tue, Dec 20, 2022, 21:56 Matthias J. Sax 
> wrote:
> > > >
> > > > > Congrats!
> > > > >
> > > > > On 12/20/22 12:01 PM, Josep Prat wrote:
> > > > > > Thank you all!
> > > > > >
> > > > > > ———
> > > > > > Josep Prat
> > > > > >
> > > > > > Aiven Deutschland GmbH
> > > > > >
> > > > > > Immanuelkirchstraße 26, 10405 Berlin
> > > > > >
> > > > > > Amtsgericht Charlottenburg, HRB 209739 B
> > > > > >
> > > > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > > > >
> > > > > > m: +491715557497
> > > > > >
> > > > > > w: aiven.io
> > > > > >
> > > > > > e: josep.p...@aiven.io
> > > > > >
> > > > > > On Tue, Dec 20, 2022, 20:42 Bill Bejeck 
> wrote:
> > > > > >
> > > > > >> Congratulations Josep!
> > > > > >>
> > > > > >> -Bill
> > > > > >>
> > > > > >> On Tue, Dec 20, 2022 at 1:11 PM Mickael Maison <
> > > > > mickael.mai...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Congratulations Josep!
> > > > > >>>
> > > > > >>> On Tue, Dec 20, 2022 at 6:55 PM Bruno Cadonna <
> > cado...@apache.org>
> > > > > >> wrote:
> > > > > 
> > > > >  Congrats, Josep!
> > > > > 
> > > > >  Well deserved!
> > > > > 
> > > > >  Best,
> > > > >  Bruno
> > > > > 
> > > > >  On 20.12.22 18:40, Kirk True wrote:
> > > > > > Congrats Josep!
> > > > > >
> > > > > > On Tue, Dec 20, 2022, at 9:33 AM, Jorge Esteban Quilcate
> Otoya
> > > > wrote:
> > > > > >> Congrats Josep!!
> > > > > >>
> > > > > >> On Tue, 20 Dec 2022, 17:31 Greg Harris,
> > > > > >>  > > > > 
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Congratulations Josep!
> > > > > >>>
> > > > > >>> On Tue, Dec 20, 2022 at 9:29 AM Chris Egerton <
> > > > > >>> fearthecel...@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > >  Congrats Josep! Well-earned.
> > > > > 
> > > > >  On Tue, Dec 20, 2022, 12:26 Jun Rao
> >  > > >
> > > > > >>> wrote:
> > > > > 
> > > > > > Hi, Everyone,
> > > > > >
> > > > > > The PMC of Apache Kafka is pleased to announce a new
> Kafka
> > > > > >>> committer
> > > > >  Josep
> > > > > >Prat.
> > > > > >
> > > > > > Josep has been contributing to Kafka since May 2021. He
> > > > > >>> contributed 20
> > > > >  PRs
> > > > > > including the following 2 KIPs.
> > > > > >
> > > > > > KIP-773 Differentiate metric latency measured in ms and
> ns
> > > > > > KIP-744: Migrate TaskMetadata and ThreadMetadata to an
> > > > interface
> > > > > >>> with
> > > > > > internal implementation
> > > > > >
> > > > > > Congratulations, Josep!
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun (on behalf of the Apache Kafka PMC)
> > > > > >
> > > > > 
> > > > > >>>
> > > > > >>
> > > > > >
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> [image: Aiven] 
>
> *Josep Prat*
> Open Source Engineering Director, *Aiven*
> josep.p...@aiven.io   |   +491715557497
> aiven.io    |    >
>      <
> https://twitter.com/aiven_io>
> *Aiven Deutschland GmbH*
> Immanuelkirchstraße 26, 10405 Berlin
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> Amtsgericht Charlottenburg, HRB 209739 B
>


Re: [ANNOUNCE] New committer: Edoardo Comar

2023-01-17 Thread Guozhang Wang
Congratulations, Edoardo!

On Tue, Jan 10, 2023 at 9:11 AM Bruno Cadonna  wrote:

> Congrats!
>
> Best,
> Bruno
>
> On 10.01.23 11:00, Edoardo Comar wrote:
> > Many thanks everyone !
> >
> > On Mon, 9 Jan 2023 at 19:40, Rajini Sivaram 
> wrote:
> >
> >> Congratulations, Edo!
> >>
> >> Regards,
> >>
> >> Rajini
> >>
> >> On Mon, Jan 9, 2023 at 10:16 AM Tom Bentley 
> wrote:
> >>
> >>> Congratulations!
> >>>
> >>> On Sun, 8 Jan 2023 at 01:14, Satish Duggana 
> >>> wrote:
> >>>
>  Congratulations, Edorado!
> 
>  On Sun, 8 Jan 2023 at 00:15, Viktor Somogyi-Vass
>   wrote:
> >
> > Congrats Edoardo!
> >
> > On Sat, Jan 7, 2023, 18:15 Bill Bejeck  wrote:
> >
> >> Congratulations, Edoardo!
> >>
> >> -Bill
> >>
> >> On Sat, Jan 7, 2023 at 12:11 PM John Roesler 
>  wrote:
> >>
> >>> Congrats, Edoardo!
> >>> -John
> >>>
> >>> On Fri, Jan 6, 2023, at 20:47, Matthias J. Sax wrote:
>  Congrats!
> 
>  On 1/6/23 5:15 PM, Luke Chen wrote:
> > Congratulations, Edoardo!
> >
> > Luke
> >
> > On Sat, Jan 7, 2023 at 7:58 AM Mickael Maison <
> >> mickael.mai...@gmail.com
> 
> > wrote:
> >
> >> Congratulations Edo!
> >>
> >>
> >> On Sat, Jan 7, 2023 at 12:05 AM Jun Rao
> >>>  >
> >>> wrote:
> >>>
> >>> Hi, Everyone,
> >>>
> >>> The PMC of Apache Kafka is pleased to announce a new Kafka
>  committer
> >> Edoardo
> >>> Comar.
> >>>
> >>> Edoardo has been a long time Kafka contributor since 2016.
> >> His
>  major
> >>> contributions are the following.
> >>>
> >>> KIP-302: Enable Kafka clients to use all DNS resolved IP
>  addresses
> >>> KIP-277: Fine Grained ACL for CreateTopics API
> >>> KIP-136: Add Listener name to SelectorMetrics tags
> >>>
> >>> Congratulations, Edoardo!
> >>>
> >>> Thanks,
> >>>
> >>> Jun (on behalf of the Apache Kafka PMC)
> >>
> >
> >>>
> >>
> 
> 
> >>>
> >>
> >
>


Re: [ANNOUNCE] New committer: Satish Duggana

2023-01-17 Thread Guozhang Wang
Congratulations, Satish!

On Tue, Jan 10, 2023 at 11:31 AM Rajini Sivaram 
wrote:

> Congratulations, Satish!
>
> Regards,
>
> Rajini
>
> On Tue, Jan 10, 2023 at 5:12 PM Bruno Cadonna  wrote:
>
> > Congrats!
> >
> > Best,
> > Bruno
> >
> > On 24.12.22 12:44, Manikumar wrote:
> > > Congrats, Satish!  Well deserved.
> > >
> > > On Sat, Dec 24, 2022, 5:10 PM Tom Bentley  wrote:
> > >
> > >> Congratulations!
> > >>
> > >> On Sat, 24 Dec 2022 at 05:05, Luke Chen  wrote:
> > >>
> > >>> Congratulations, Satish!
> > >>>
> > >>> On Sat, Dec 24, 2022 at 4:12 AM Federico Valeri <
> fedeval...@gmail.com>
> > >>> wrote:
> > >>>
> >  Hi Satish, congrats!
> > 
> >  On Fri, Dec 23, 2022, 8:46 PM Viktor Somogyi-Vass
> >   wrote:
> > 
> > > Congrats Satish!
> > >
> > > On Fri, Dec 23, 2022, 19:38 Mickael Maison <
> mickael.mai...@gmail.com
> > >>>
> > > wrote:
> > >
> > >> Congratulations Satish!
> > >>
> > >> On Fri, Dec 23, 2022 at 7:36 PM Divij Vaidya <
> > >>> divijvaidy...@gmail.com>
> > >> wrote:
> > >>>
> > >>> Congratulations Satish! 
> > >>>
> > >>> On Fri 23. Dec 2022 at 19:32, Josep Prat
> > >>>  > >
> > >>> wrote:
> > >>>
> >  Congrats Satish!
> > 
> >  ———
> >  Josep Prat
> > 
> >  Aiven Deutschland GmbH
> > 
> >  Immanuelkirchstraße 26, 10405 Berlin
> >  <
> > >>
> > >
> > 
> > >>>
> > >>
> >
> https://www.google.com/maps/search/Immanuelkirchstra%C3%9Fe+26,+10405+Berlin?entry=gmail=g
> > >>>
> > 
> >  Amtsgericht Charlottenburg, HRB 209739 B
> > 
> >  Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > 
> >  m: +491715557497
> > 
> >  w: aiven.io
> > 
> >  e: josep.p...@aiven.io
> > 
> >  On Fri, Dec 23, 2022, 19:23 Chris Egerton <
> > >>> fearthecel...@gmail.com
> > >
> > >> wrote:
> > 
> > > Congrats, Satish!
> > >
> > > On Fri, Dec 23, 2022, 13:19 Arun Raju 
> > > wrote:
> > >
> > >> Congratulations 
> > >>
> > >> On Fri, Dec 23, 2022, 1:08 PM Jun Rao
> > >>>  > >
> >  wrote:
> > >>
> > >>> Hi, Everyone,
> > >>>
> > >>> The PMC of Apache Kafka is pleased to announce a new
> > >> Kafka
> > >> committer
> > >> Satish
> > >>> Duggana.
> > >>>
> > >>> Satish has been a long time Kafka contributor since 2017.
> > >>> He
> >  is
> > >> the
> > > main
> > >>> driver behind KIP-405 that integrates Kafka with remote
> > > storage,
> > >> a
> > >>> significant and much anticipated feature in Kafka.
> > >>>
> > >>> Congratulations, Satish!
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jun (on behalf of the Apache Kafka PMC)
> > >>>
> > >>
> > >
> > 
> > >>> --
> > >>> Divij Vaidya
> > >>
> > >
> > 
> > >>>
> > >>
> > >
> >
>


Re: [ANNOUNCE] New committer: Justine Olshan

2023-01-17 Thread Guozhang Wang
Congratulations, Justine (I'm also late)!

On Wed, Jan 11, 2023 at 12:17 AM Bruno Cadonna  wrote:

> Hi Justine,
>
> Re-reading my message I realized that my message might be
> misinterpreted. I meant that I am late with congratulating you due to
> the holidays, NOT that it took you long becoming a committer!
>
> Sorry for the potential confusion!
>
> Best,
> Bruno
>
> On 11.01.23 08:57, Bruno Cadonna wrote:
> > Better late than never!
> >
> > Congrats!
> >
> > Best,
> > Bruno
> >
> > On 04.01.23 20:25, Kirk True wrote:
> >> Congratulations!
> >>
> >> On Tue, Jan 3, 2023, at 7:34 PM, John Roesler wrote:
> >>> Congrats, Justine!
> >>> -John
> >>>
> >>> On Tue, Jan 3, 2023, at 13:03, Matthias J. Sax wrote:
>  Congrats!
> 
>  On 12/29/22 6:47 PM, ziming deng wrote:
> > Congratulations Justine!
> > —
> > Best,
> > Ziming
> >
> >> On Dec 30, 2022, at 10:06, Luke Chen  wrote:
> >>
> >> Congratulations, Justine!
> >> Well deserved!
> >>
> >> Luke
> >>
> >> On Fri, Dec 30, 2022 at 9:15 AM Ron Dagostino 
> >> wrote:
> >>
> >>> Congratulations, Justine!Well-deserved., and I’m very happy
> >>> for you.
> >>>
> >>> Ron
> >>>
>  On Dec 29, 2022, at 6:13 PM, Israel Ekpo 
>  wrote:
> 
>  Congratulations Justine!
> 
> 
> > On Thu, Dec 29, 2022 at 5:05 PM Greg Harris
> >>> 
> > wrote:
> >
> > Congratulations Justine!
> >
> >> On Thu, Dec 29, 2022 at 1:37 PM Bill Bejeck
> >>  wrote:
> >>
> >> Congratulations Justine!
> >>
> >>
> >> -Bill
> >>
> >>> On Thu, Dec 29, 2022 at 4:36 PM Philip Nee <
> philip...@gmail.com>
> >>> wrote:
> >>
> >>> wow congrats!
> >>>
> >>> On Thu, Dec 29, 2022 at 1:05 PM Chris Egerton <
> >>> fearthecel...@gmail.com
> >>
> >>> wrote:
> >>>
>  Congrats, Justine!
> 
>  On Thu, Dec 29, 2022, 15:58 David Jacot 
>  wrote:
> 
> > Hi all,
> >
> > The PMC of Apache Kafka is pleased to announce a new Kafka
> > committer
> > Justine
> > Olshan.
> >
> > Justine has been contributing to Kafka since June 2019. She
> >> contributed
>  53
> > PRs including the following KIPs.
> >
> > KIP-480: Sticky Partitioner
> > KIP-516: Topic Identifiers & Topic Deletion State
> Improvements
> > KIP-854: Separate configuration for producer ID expiry
> > KIP-890: Transactions Server-Side Defense (in progress)
> >
> > Congratulations, Justine!
> >
> > Thanks,
> >
> > David (on behalf of the Apache Kafka PMC)
> >
> 
> >>>
> >>
> >
> >>>
> >
> >
> >>>
> >>
>


Re: [ANNOUNCE] New committer: Walker Carlson

2023-01-17 Thread Guozhang Wang
Congrats, Walker!

On Tue, Jan 17, 2023 at 2:20 PM Chris Egerton 
wrote:

> Congrats, Walker!
>
> On Tue, Jan 17, 2023, 17:07 Bill Bejeck  wrote:
>
> > Congratulations, Walker!
> >
> > -Bill
> >
> > On Tue, Jan 17, 2023 at 4:57 PM Matthias J. Sax 
> wrote:
> >
> > > Dear community,
> > >
> > > I am pleased to announce Walker Carlson as a new Kafka committer.
> > >
> > > Walker has been contributing to Apache Kafka since November 2019. He
> > > made various contributions including the following KIPs.
> > >
> > > KIP-671: Introduce Kafka Streams Specific Uncaught Exception Handler
> > > KIP-696: Update Streams FSM to clarify ERROR state meaning
> > > KIP-715: Expose Committed offset in streams
> > >
> > >
> > > Congratulations Walker and welcome on board!
> > >
> > >
> > > Thanks,
> > >-Matthias (on behalf of the Apache Kafka PMC)
> > >
> >
>


[ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-01 Thread Guozhang Wang
Hi everyone,

I'd like to introduce our new Kafka PMC member, Bruno.

Bruno has been a committer since April. 2021 and has been very active in
the community. He's a key contributor to Kafka Streams, and also helped
review a lot of horizontal improvements such as Mockito. It is my pleasure
to announce that Bruno has agreed to join the Kafka PMC.

Congratulations, Bruno!

-- Guozhang Wang, on behalf of Apache Kafka PMC


[VOTE] KIP-869: Improve Streams State Restoration Visibility

2022-10-12 Thread Guozhang Wang
Hello all,

I'd like to start a vote for the following KIP, aiming to improve Kafka
Stream's restoration visibility via new metrics and callback methods:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility


Thanks!
-- Guozhang


Re: [DISCUSS] KIP-869: Improve Streams State Restoration Visibility

2022-10-12 Thread Guozhang Wang
I've updated the KIP doc and would start calling for a vote for this KIP
now.

On Tue, Oct 11, 2022 at 4:26 AM Nick Telford  wrote:

> Hi Guozhang,
>
> What you propose sounds good to me. Having the more detailed Task-level
> metrics at DEBUG makes sense.
>
> Regards,
>
> Nick
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-869: Improve Streams State Restoration Visibility

2022-10-10 Thread Guozhang Wang
ation from each poll?
> >
> > 3.
> > Could you please give an example where "active-records-restored-total"
> > and "standby-records-updated-total" are useful?
> >
> > Best,
> > Bruno
> >
> > On 20.09.22 22:45, Guozhang Wang wrote:
> > > Hello Bruno/Nick,
> > >
> > > I've updated the KIP wiki to reflect the incorporated comments from
> you,
> > > please feel free to take another look and let me know what you think.
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Sep 20, 2022 at 9:37 AM Guozhang Wang 
> > wrote:
> > >
> > >> Hi Nick,
> > >>
> > >> Thanks for the reviews, and I think these are good suggestions. Note
> > that
> > >> currently the `restore-records-total/rate` would include both
> restoring
> > >> active tasks as well as updating standby tasks, I think for your
> > purposes
> > >> you'd be more interested in active restoring tasks since they could
> > >> complete, while updating standby tasks would not complete even if they
> > have
> > >> caught up with the active. At the same time, the changelog reader
> would
> > >> only be restoring either active or standby at a given time, and active
> > >> tasks has a higher priority such that as long as there is at least one
> > >> active task still restoring, we would not restore any standby tasks.
> > From
> > >> your suggestion, I'm thinking that maybe I should break up the rate /
> > total
> > >> metric for active and standby tasks separately.
> > >>
> > >> For deriving estimated time remaining though, the `total` metric may
> not
> > >> be helpful since they will not be "reset" after rebalances, i.e. they
> > will
> > >> be an ever-increasing number and record the total number of records
> for
> > the
> > >> lifetime of the app. But still, just the remaining records alone, with
> > the
> > >> time elapsed monitored by the apps, should be sufficient to get the
> > >> estimated time remaining.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Tue, Sep 20, 2022 at 3:10 AM Nick Telford 
> > >> wrote:
> > >>
> > >>> Hi Guozhang,
> > >>>
> > >>> KIP looks great, I have one suggestion: in addition to
> > >>> "restore-records-total", it would also be useful to track the number
> of
> > >>> records *remaining*, that is, the records that have not yet been
> > restored.
> > >>> This is actually the metric I was attempting to implement in the
> > >>> StateRestoreListener that bumped me up against KAFKA-10575 :-)
> > >>>
> > >>> With both a "restore-records-total" and a "restore-remaining-total"
> (or
> > >>> similar) metric, it's possible to derive useful information like the
> > >>> estimated time remaining for restoration (by dividing the remaining
> > total
> > >>> by the restoration rate).
> > >>>
> > >>> Regards,
> > >>>
> > >>> Nick
> > >>>
> > >>> On Mon, 19 Sept 2022 at 19:57, Guozhang Wang 
> > wrote:
> > >>>
> > >>>> Hello Bruno,
> > >>>>
> > >>>> Thanks for your comments!
> > >>>>
> > >>>> 1. Regarding the metrics group name: originally I put
> > >>>> "stream-state-metrics" as it's related to state store restorations,
> > but
> > >>>> after a second thought I think I agree with you that this is quite
> > >>>> confusing and not right. About the metrics groups, right now I have
> > two
> > >>>> ideas:
> > >>>>
> > >>>> a) Still use the metric group name "stream-thread-metrics", but
> > >>>> differentiate with the processing threads on the thread id. The pros
> > is
> > >>>> that we do not introduce a new group, the cons is that users who
> want
> > to
> > >>>> separate processing from restoration/updating in the future needs to
> > do
> > >>>> that on the thread id labels.
> > >>>> b) Introduce a new group name, for example
> > >>> "stream-state-updater-metrics"
> > >>>>

Re: [ANNOUNCE] New committer: Deng Ziming

2022-10-10 Thread Guozhang Wang
Congrats Ziming! Thanks a lot for your contributions.

Guozhang

On Mon, Oct 10, 2022 at 9:31 AM Jason Gustafson 
wrote:

> Hi All
>
> The PMC for Apache Kafka has invited Deng Ziming to become a committer,
> and we are excited to announce that he has accepted!
>
> Ziming has been contributing to Kafka for about three years. He has
> authored
> more than 100 patches and helped to review nearly as many. In particular,
> he made significant contributions to the KRaft project which had a big part
> in reaching our production readiness goal in the 3.3 release:
> https://blogs.apache.org/kafka/entry/what-rsquo-s-new-in.
>
> Please join me in congratulating Ziming! Thanks for all of your
> contributions!
>
> -- Jason, on behalf of the Apache Kafka PMC
>


-- 
-- Guozhang


Re: [DISCUSS] Apache Kafka 3.4.0 release

2022-10-06 Thread Guozhang Wang
4.0 is gonna be the next big milestone :) Thanks Sophie!

On Wed, Oct 5, 2022 at 6:08 PM David Jacot  wrote:

> +1. Thanks, Sophie!
>
> Le mer. 5 oct. 2022 à 19:57, Luke Chen  a écrit :
>
> > Hi Sophie,
> >
> > Thanks for volunteering!
> >
> > Luke
> >
> > On Thu, Oct 6, 2022 at 6:17 AM José Armando García Sancio
> >  wrote:
> >
> > > Thanks for volunteering Sophie.
> > >
> > > On Wed, Oct 5, 2022 at 3:01 PM Sophie Blee-Goldman
> > >  wrote:
> > > >
> > > > Hey all,
> > > >
> > > > I'd like to volunteer as release manager for the next feature
> release,
> > > > which will be Apache
> > > > Kafka 3.4.0. If that sounds good to everyone I'll update this thread
> > with
> > > > the release plan in the coming week.
> > > >
> > > > Cheers,
> > > > A. Sophie Blee-Goldman
> > >
> > >
> > >
> > > --
> > > -José
> > >
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-10-04 Thread Guozhang Wang
Hello David,

I've made my final pass on the doc and I think it looks good now. +1.


Guozhang

On Wed, Sep 14, 2022 at 1:37 PM Guozhang Wang  wrote:

> Thanks David,
>
> There are a few minor comments pending in the discussion thread, and one
> is about whether we should merge PreparePartitionAssignment with HB. But I
> think the KIP itself is in pretty good shape now. Thanks!
>
>
> Guozhang
>
> On Fri, Sep 9, 2022 at 1:32 AM David Jacot 
> wrote:
>
>> Hi all,
>>
>> Thank you all for the very positive discussion about KIP-848. It looks
>> like folks are very positive about it overall.
>>
>> I would like to start a vote on KIP-848, which introduces a brand new
>> consumer rebalance protocol.
>>
>> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
>>
>> Best,
>> David
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang


Re: Request for contributor permissions

2022-10-02 Thread Guozhang Wang
Hello Ashmeet,

Thanks for your interest in contributing. I've added you with contributor
permissions.

Guozhang

On Sat, Oct 1, 2022 at 4:20 AM Ashmeet Lamba  wrote:

> Hello,
>
> I'd like to request for contributor permissions. JIRA ID: ashmeetlamba
>
> Thank you.
> Ashmeet
>


-- 
-- Guozhang


Re: I would like to contribute

2022-09-29 Thread Guozhang Wang
Done. Cheers!

You can follow the KIP guidance
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
for any changes that involve Public interfaces.

Just a side note, in general the community would be more interested to see
its usage when accepting new utils / toolings etc --- when you file the KIP
e.g. you would need to write a convincing "motivation" section in the page
to make the argument --- so if you have already spotted some places in
Kafka that can immediately get benefits from the proposed new classes like
piped serdes, it would be great to include them in the proposal.


Guozhang

On Thu, Sep 29, 2022 at 1:21 PM Collins, Alex
 wrote:

> Awesome. Yes. alex_collins
> ____
> From: Guozhang Wang 
> Sent: Thursday, September 29, 2022 12:55:27 PM
> To: dev@kafka.apache.org 
> Subject: Re: I would like to contribute
>
> This email is from an external sender.
>
>
> Hello Alex,
>
> Thank you for your interests!Do you already have an apache id? If yes I can
> help letting you be able to create corresponding wiki pages.
>
> On Thu, Sep 29, 2022 at 11:56 AM Collins, Alex
>  wrote:
>
> > I have three things to contribute:
> >
> >
> >   *   Bug fix for console producer/consumers. I’ve created a PR.
> >   *   Two new serdes which can be understood from the PRs:
> >  *   https://github.com/apache/kafka/pull/12698
> >  *   https://github.com/apache/kafka/pull/12699
> >
> > The serdes change “org/apache/kafka/common/serialization” which are
> > “public interfaces”. Changes to public interfaces need a KIP. So I need
> to
> > create KIPs.
> >
> > Alex
> >
> > --
> > Principal Software Engineer, API Externalization
> >
> >
>
> --
> -- Guozhang
>


-- 
-- Guozhang


Re: I would like to contribute

2022-09-29 Thread Guozhang Wang
Hello Alex,

Thank you for your interests!Do you already have an apache id? If yes I can
help letting you be able to create corresponding wiki pages.

On Thu, Sep 29, 2022 at 11:56 AM Collins, Alex
 wrote:

> I have three things to contribute:
>
>
>   *   Bug fix for console producer/consumers. I’ve created a PR.
>   *   Two new serdes which can be understood from the PRs:
>  *   https://github.com/apache/kafka/pull/12698
>  *   https://github.com/apache/kafka/pull/12699
>
> The serdes change “org/apache/kafka/common/serialization” which are
> “public interfaces”. Changes to public interfaces need a KIP. So I need to
> create KIPs.
>
> Alex
>
> --
> Principal Software Engineer, API Externalization
>
>

-- 
-- Guozhang


Re: [DISCUSS] KIP-871: Fix ByteBufferSerializer#serialize(String, ByteBuffer) compatible problem

2022-09-26 Thread Guozhang Wang
Hello ShunKang,

Thanks for filing the report. I looked at the source code and I agree it's
a bug. The reason we did not get it in the past is probably just because we
were inefficiently keep byte-copying and hence for almost all cases, the
offset is set to 0 when serialization happens.

Note that this fix does not really require a KIP since it does not change
any public APIs (for details on what kind of changes require a KIP
discussion, please see
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Whatisconsidereda%22majorchange%22thatneedsaKIP
?)


Guozhang

On Sun, Sep 25, 2022 at 6:52 AM ShunKang Lin 
wrote:

> Hi all,
>
> I'd like to start a new discussion thread on KIP-871 (Kafka Client) which
> proposes that fix ByteBufferSerializer#serialize(String, ByteBuffer)
> compatible problem.
>
> Links:
>
> KIP:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495816
>
> Jira: https://issues.apache.org/jira/browse/KAFKA-4852
>
> Thanks,
> ShunKang
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-26 Thread Guozhang Wang
Re versions: fair enough, I think it's okay to keep it as strings. Just to
clarify the concern I had is that if we do want to augment it in the
future, it will be harder to change a `string` field into a `struct` :P

Re onAssignment: actually even with the current proposal, since we are
giving the final encoded metadata upon the first `onAssignment` call for
the generation already, Streams would infer all the standby tasks that need
are migrated in / out, so for Streams' purposes, we'd need to handle such
decoupling anyways, and it's actually the opposite: if the first
`onAssignment` does not include those yet-to-be-assigned then Streams would
not know if a migrating out standby would really be promoting to active
later or should it be completely removed, until later when next
`onAssignment` is called. And that's where I mentioned "we'd need to figure
out which onAssignment is the final call" etc. If we have all the
partitions on the `onAssignment`, then we can infer such actions and decide
whether we should close a standby task immediately or just recycle it and
wait for the active task to be assigned eventually.

On the other hand, if we call onAssignment incrementally similar to
onPartitionsAssigned, in which we include both assigned and
yet-to-be-assigned partitions, then for people who implement both
interfaces, they can just ignore the `onPartitionsAssigned` since they have
all knowledge they need from onAssignment already. And that's what I'm
trying to avoid by making the functionality of the two more separated.


Guozhang




On Mon, Sep 26, 2022 at 11:30 AM David Jacot 
wrote:

> Regarding the version, I would rather add this later when we have a
> clear use case for it. It seems to me that we are speculating here. I
> understand your point but I am not fully convinced by the solution at
> the moment. Would you agree with this?
>
> Regarding the onAssignment, I was thinking about the case where a task
> is promoted from standby to active. In this case, having onAssignment
> with the combined partitions could make it difficult, no? I am
> thinking that the assignor will have to remove the standby based on
> the metadata but it won't know what to do after that. If the partition
> is assigned directly, it can convert it to an active task. On the
> other hand, if the partition is not available yet, it would have to
> keep it as a standby until the partition is really assigned. It seems
> to be that the assignor won't have the information to make this
> decision, no? In this case, decouple the "why" from the "when" seems
> to make things harder. I am not so familiar with Streams though so my
> intuition could be wrong here.
>
> David
>
> On Mon, Sep 26, 2022 at 7:26 PM Guozhang Wang  wrote:
> >
> > Regarding the version, what I was thinking is that in the HB request, for
> > "serverAssignor" field, instead of just having it as a single string
> field,
> > maybe we could consider also making it a structure that includes: name,
> > minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion
> > means the versions of the server assignor that the client work best with.
> > That being said, I agree with you that such information may also be
> > inferred elsewhere e.g. by looking into the "rackId" field, and see if it
> > contains a hyphen or not etc. All I was wondering is that, if such
> version
> > information would be useful for the server assignors to determine its
> > actual assignment logic. I do not feel very strong about this one though
> > --- even if we do not add it now, we can potentially add later, it's just
> > that changing a single string field to a structure would be hard for
> > compatibility and we'd then probably have to add top-level fields.
> >
> > Regarding the `onAssignment` logic, again my train of thoughts is that,
> if
> > users want to know exactly when a partition is assigned / revoked, they
> > would be leveraging on the rebalance callbacks, as that's what people
> > should rely on to determine "when" partitions are assigned. The
> > `onAssignment` should be used for getting "why" such partition assignment
> > decision is made, and hence returning `combined partitions` would be
> okay.
> > Streams e.g. implement both rebalance callbacks and the assignors, and it
> > gets the "when" from the former (and create/close active tasks
> accordingly)
> > and the "why" from the latter (and update its global info bookkeeping as
> > well as standby maintenance accordingly). Most users would be just
> > interested in the rebalance callback, and not implement their own
> assignor
> > at all if they do not care about "why" as they trust

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-26 Thread Guozhang Wang
Regarding the version, what I was thinking is that in the HB request, for
"serverAssignor" field, instead of just having it as a single string field,
maybe we could consider also making it a structure that includes: name,
minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion
means the versions of the server assignor that the client work best with.
That being said, I agree with you that such information may also be
inferred elsewhere e.g. by looking into the "rackId" field, and see if it
contains a hyphen or not etc. All I was wondering is that, if such version
information would be useful for the server assignors to determine its
actual assignment logic. I do not feel very strong about this one though
--- even if we do not add it now, we can potentially add later, it's just
that changing a single string field to a structure would be hard for
compatibility and we'd then probably have to add top-level fields.

Regarding the `onAssignment` logic, again my train of thoughts is that, if
users want to know exactly when a partition is assigned / revoked, they
would be leveraging on the rebalance callbacks, as that's what people
should rely on to determine "when" partitions are assigned. The
`onAssignment` should be used for getting "why" such partition assignment
decision is made, and hence returning `combined partitions` would be okay.
Streams e.g. implement both rebalance callbacks and the assignors, and it
gets the "when" from the former (and create/close active tasks accordingly)
and the "why" from the latter (and update its global info bookkeeping as
well as standby maintenance accordingly). Most users would be just
interested in the rebalance callback, and not implement their own assignor
at all if they do not care about "why" as they trust the server assignors
would take good care of those, and only about "when". So if we did build
such two types of APIs from scratch, I'd indeed feel that not providing the
partitions but only the metadata for `onAssignment` may be less confusing
and push users to separate the usage of these two more clearly, but since
we already introduced partitions in `onAssignment` for compatibility I'm
less keen on removing them.


Guozhang

On Mon, Sep 26, 2022 at 6:55 AM David Jacot 
wrote:

> Hi Guozhang,
>
> Regarding the version, my understanding is that the version would be
> either the client software version or the request version, is this
> correct? If so, we could indeed pass this information down to the
> assignor via the interface. One way would be to pass a "server
> context" to the assignor and that context would include that
> information (and perhaps more). Is this what you are looking for?
>
> Regarding the onAssignment, I think that I understand your point. I
> suppose that the assignor could also be clever and keep track of the
> last metadata to decide whether it has to do something or not. One
> question that is still not clear to me is whether the assignor needs
> to know all the assigned partitions upfront regardless of whether they
> are already revoked or not. Do you think that we need this as well?
>
> From an API perspective, we could have something like
> onAssignment(Metadata(version, reason, metadata, assigned partitions,
> pending partitions)). Where the assigned partitions are the partitions
> ready to be used and the pending partitions are the one assigned to
> the member but not revoked yet. I find it a bit weird that this method
> would be called only once because the assignor would not know when the
> pending partitions changes. That does not look like a clean API. An
> alternative would be to use onAssignment(Metadata(version, reason,
> metadata, combined partitions)) but this seems error prone because it
> is not clear whether a partition is usable or not. Or do you think
> that we should not provide the partitions but only the metadata?
>
> Best,
> David
>
> On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang  wrote:
> >
> > Hello David,
> >
> > On Fri, Sep 23, 2022 at 2:00 AM David Jacot  >
> > wrote:
> >
> > > Hey,
> > >
> > > > Just to clarify I was asking about the `version` of the assignor
> (i.e. up
> > > to what version that the client would support), and I do agree we
> would not
> > > need metadata. What I have in mind is that, for some specific built-in
> > > broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> > > newer version we would have a hierarchical rack ID string format, like
> > > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > > would still be in old format. In this case, the broker then needs to
> choose
> > > the old ve

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-23 Thread Guozhang Wang
 cases now.

The implementers of the assignor though, would care about "how the
assignment was made", that includes from which rebalance a certain
revoke/assign decision was made, based on what metadata such assignment is
made, etc. And that's the whole point of the `onAssignment` function since
otherwise they can just rely on the listeners. They usually implementation
logic of this callback is to e.g. bookkeep the assignment decision driving
factors a.k.a. the metadata, global information that needs to be propagated
to all members, etc. Take Streams as an example, the active processing
tasks go along with the assigned partitions, and we can always just
incrementally create / close them upon each rebalance listener triggers,
when certain partitions are revoked or assigned together; standby tasks
however are encoded with the metadata, and we can only know which standby
tasks should we get / drop based on the `onAssignment` function, and in
fact the creation of such tasks as a result of the metadata bookkeeping
does not need to wait until all the partitions that are yet-assigned have
been completely assigned to the member. Such information may not always be
updatable in an incremental manner as the partitions-revoked /
partitions-assigned. In such a case, it's better to just trigger this
function "once per decision made" i.e. once per rebalance generation.


Guozhang



> Best,
> David
>
> On Thu, Sep 22, 2022 at 6:01 PM Guozhang Wang  wrote:
> >
> > Hi David, thanks for all the detailed explanations. I think they all make
> > sense. Just want to have a couple follow-ups here:
> >
> > > I don't really see the benefits here because server side assignors
> > don't have metadata at all. They only assign topic-partitions. They
> > are not supposed to generate metadata nor to receive metadata from the
> > members.
> >
> > Just to clarify I was asking about the `version` of the assignor (i.e. up
> > to what version that the client would support), and I do agree we would
> not
> > need metadata. What I have in mind is that, for some specific built-in
> > broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> > newer version we would have a hierarchical rack ID string format, like
> > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > would still be in old format. In this case, the broker then needs to
> choose
> > the old versioned assignor. I'm probably making something up here for
> rack
> > aware assignors, but I'm wondering if in general such an "auto-downgrade"
> > behavior would be needed still for broker-side assignor, and if yes would
> > "version" still be useful.
> >
> > > Yeah, that's right. Within a rebalance, `onAssignment` is called once
> > when the member transitions to a new epoch. This one contains the full
> > metadata provided by the client side assignor. Then, `onAssignment`
> > can be called max N times where N is the number of partitions pending
> > revocation by other members. Let me try to clarify this in the KIP.
> >
> > Okay, my understanding is that the calling ordering of these callbacks
> > would be like the following:
> >
> > 
> > onPartitionsRevoked();   // just once, since we do not really need
> > to revoke incrementally.
> >
> > onAssignment();// the first call, with epoch incremented
> > onPartitionsAssigned();   // paired with the onAssignment
> >
> > onAssignment();  // the first onAssignment would bump up the
> > epoch, and the metadata reflected.
> > onPartitionsAssigned();   // each time we get an additional assignment,
> we
> > call onAssignment and then paired with an onPartitionsAssigned
> > ...
> > onAssignment();
> > onPartitionsAssigned();   // on each of the onAssignment calls, the
> encoded
> > metadata would not change, only the incrementally added partitions be
> > reflected
> >
> > Is that the case?
> >
> > I'm wondering if we would still call onAssignment just once, that encodes
> > all the assignment for this rebalance, including all the partitions that
> > should be assigned to the member but not yet assigned since they have not
> > been revoked by others. In that case the call ordering would be:
> >
> > 
> > onPartitionsRevoked();   // just once
> > onAssignment();// just once, with epoch incremented, and metadata
> > encoded changed, the "assignment" field also reflect the final target
> > assignment
> > onPartitionsAssigned();   // multiple times, which rep

Re: [VOTE] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-22 Thread Guozhang Wang
+1, thanks ShunKang.

Though its proposed motivation is on consumer fetcher's deserialization, I
think adding an overloaded method with ByteBuffer would help with other
serde places on the client side as well.


Guozhang

On Thu, Sep 22, 2022 at 9:41 AM ShunKang Lin 
wrote:

> Hi everyone,
>
> I'd like to open the vote for KIP-863, which proposes to reduce memory
> allocation and memory copying in Fetcher#parseRecord(TopicPartition,
> RecordBatch, Record).
>
> The proposal is here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
>
> Thanks to all who reviewed the proposal, and thanks in advance for taking
> the time to vote!
>
> Best,
> ShunKang
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-22 Thread Guozhang Wang
Could you start a separate VOTE email thread calling for votes?

On Thu, Sep 22, 2022 at 9:19 AM ShunKang Lin 
wrote:

> Hi Guozhang,
>
> Thanks for your help! By the way, what should I do next?
>
> Best,
> ShunKang
>
> Guozhang Wang  于2022年9月22日周四 23:21写道:
>
> > Thanks ShunKang,
> >
> > I made a few nit edits on the Motivation section as well. LGTM for me
> now.
> >
> > On Thu, Sep 22, 2022 at 7:33 AM ShunKang Lin 
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > I've updated the "Motivation" section of the KIP, please take a look.
> > >
> > > Thanks.
> > > ShunKang
> > >
> > > Guozhang Wang  于2022年9月21日周三 01:26写道:
> > >
> > > > In this case, could you update the KIP to clarify the allocation
> > savings
> > > > more clearly in the "Motivation" section? Also you could mention that
> > for
> > > > user customizable serdes, if they could provide overwrites on the
> > > > overloaded function that's also possible for optimize memory
> > allocations.
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Sep 20, 2022 at 10:24 AM Guozhang Wang 
> > > wrote:
> > > >
> > > > > 1. Ack, thanks.
> > > > > 2. Sounds good, thanks for clarifying.
> > > > >
> > > > > On Tue, Sep 20, 2022 at 9:50 AM ShunKang Lin <
> > > linshunkang@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Guozhang,
> > > > >>
> > > > >> Thanks for your comments!
> > > > >>
> > > > >> 1. We can reduce memory allocation if the key/value types happen
> to
> > be
> > > > >> ByteBuffer or String.
> > > > >> 2. I would like to add `default ByteBuffer
> > > serializeToByteBuffer(String
> > > > >> topic, Headers headers, T data)` in Serializer to reduce memory
> copy
> > > in
> > > > >> `KafkaProducer#doSend(ProducerRecord, Callback)`, but this change
> > is a
> > > > bit
> > > > >> big, I prefer to submit another one KIP to do the job.
> > > > >>
> > > > >> Thanks.
> > > > >> ShunKang
> > > > >>
> > > > >> Guozhang Wang  于2022年9月20日周二 06:32写道:
> > > > >>
> > > > >> > Hello ShunKang,
> > > > >> >
> > > > >> > Thanks for filing the proposal, and sorry for the late reply!
> > > > >> >
> > > > >> > I looked over your KIP proposal and the PR, in general I think I
> > > agree
> > > > >> that
> > > > >> > adding an overloaded function with `ByteBuffer` param is
> > beneficial,
> > > > >> but I
> > > > >> > have a meta question regarding it's impact on Kafka consumer: my
> > > > >> > understanding from your PR is that, we can only save memory
> > > > allocations
> > > > >> if
> > > > >> > the key/value types happen to be ByteBuffer as well, otherwise
> we
> > > > would
> > > > >> > still do the `return deserialize(topic, headers,
> > > > Utils.toArray(data));`
> > > > >> > from default impls unless the user customized deserializers is
> > > > >> augmented to
> > > > >> > handle ByteBuffer directly, right?
> > > > >> >
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Sun, Aug 21, 2022 at 9:56 AM ShunKang Lin <
> > > > linshunkang@gmail.com
> > > > >> >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi all,
> > > > >> > >
> > > > >> > > I'd like to start a discussion on KIP-863 which is Reduce
> > > > >> > > Fetcher#parseRecord() memory copy. This KIP can reduce Kafka
> > > > Consumer
> > > > >> > > memory allocation by nearly 50% during fetch records.
> > > > >> > >
> > > > >> > > Please check
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
> > > > >> > > and https://github.com/apache/kafka/pull/12545 for more
> > details.
> > > > >> > >
> > > > >> > > Any feedbacks and comments are welcomed.
> > > > >> > >
> > > > >> > > Thanks.
> > > > >> > >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-22 Thread Guozhang Wang
Thanks ShunKang,

I made a few nit edits on the Motivation section as well. LGTM for me now.

On Thu, Sep 22, 2022 at 7:33 AM ShunKang Lin 
wrote:

> Hi Guozhang,
>
> I've updated the "Motivation" section of the KIP, please take a look.
>
> Thanks.
> ShunKang
>
> Guozhang Wang  于2022年9月21日周三 01:26写道:
>
> > In this case, could you update the KIP to clarify the allocation savings
> > more clearly in the "Motivation" section? Also you could mention that for
> > user customizable serdes, if they could provide overwrites on the
> > overloaded function that's also possible for optimize memory allocations.
> >
> > Guozhang
> >
> > On Tue, Sep 20, 2022 at 10:24 AM Guozhang Wang 
> wrote:
> >
> > > 1. Ack, thanks.
> > > 2. Sounds good, thanks for clarifying.
> > >
> > > On Tue, Sep 20, 2022 at 9:50 AM ShunKang Lin <
> linshunkang@gmail.com>
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Thanks for your comments!
> > >>
> > >> 1. We can reduce memory allocation if the key/value types happen to be
> > >> ByteBuffer or String.
> > >> 2. I would like to add `default ByteBuffer
> serializeToByteBuffer(String
> > >> topic, Headers headers, T data)` in Serializer to reduce memory copy
> in
> > >> `KafkaProducer#doSend(ProducerRecord, Callback)`, but this change is a
> > bit
> > >> big, I prefer to submit another one KIP to do the job.
> > >>
> > >> Thanks.
> > >> ShunKang
> > >>
> > >> Guozhang Wang  于2022年9月20日周二 06:32写道:
> > >>
> > >> > Hello ShunKang,
> > >> >
> > >> > Thanks for filing the proposal, and sorry for the late reply!
> > >> >
> > >> > I looked over your KIP proposal and the PR, in general I think I
> agree
> > >> that
> > >> > adding an overloaded function with `ByteBuffer` param is beneficial,
> > >> but I
> > >> > have a meta question regarding it's impact on Kafka consumer: my
> > >> > understanding from your PR is that, we can only save memory
> > allocations
> > >> if
> > >> > the key/value types happen to be ByteBuffer as well, otherwise we
> > would
> > >> > still do the `return deserialize(topic, headers,
> > Utils.toArray(data));`
> > >> > from default impls unless the user customized deserializers is
> > >> augmented to
> > >> > handle ByteBuffer directly, right?
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> >
> > >> >
> > >> > On Sun, Aug 21, 2022 at 9:56 AM ShunKang Lin <
> > linshunkang@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > > I'd like to start a discussion on KIP-863 which is Reduce
> > >> > > Fetcher#parseRecord() memory copy. This KIP can reduce Kafka
> > Consumer
> > >> > > memory allocation by nearly 50% during fetch records.
> > >> > >
> > >> > > Please check
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
> > >> > > and https://github.com/apache/kafka/pull/12545 for more details.
> > >> > >
> > >> > > Any feedbacks and comments are welcomed.
> > >> > >
> > >> > > Thanks.
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-869: Improve Streams State Restoration Visibility

2022-09-20 Thread Guozhang Wang
Hello Bruno/Nick,

I've updated the KIP wiki to reflect the incorporated comments from you,
please feel free to take another look and let me know what you think.


Guozhang

On Tue, Sep 20, 2022 at 9:37 AM Guozhang Wang  wrote:

> Hi Nick,
>
> Thanks for the reviews, and I think these are good suggestions. Note that
> currently the `restore-records-total/rate` would include both restoring
> active tasks as well as updating standby tasks, I think for your purposes
> you'd be more interested in active restoring tasks since they could
> complete, while updating standby tasks would not complete even if they have
> caught up with the active. At the same time, the changelog reader would
> only be restoring either active or standby at a given time, and active
> tasks has a higher priority such that as long as there is at least one
> active task still restoring, we would not restore any standby tasks. From
> your suggestion, I'm thinking that maybe I should break up the rate / total
> metric for active and standby tasks separately.
>
> For deriving estimated time remaining though, the `total` metric may not
> be helpful since they will not be "reset" after rebalances, i.e. they will
> be an ever-increasing number and record the total number of records for the
> lifetime of the app. But still, just the remaining records alone, with the
> time elapsed monitored by the apps, should be sufficient to get the
> estimated time remaining.
>
>
> Guozhang
>
>
> On Tue, Sep 20, 2022 at 3:10 AM Nick Telford 
> wrote:
>
>> Hi Guozhang,
>>
>> KIP looks great, I have one suggestion: in addition to
>> "restore-records-total", it would also be useful to track the number of
>> records *remaining*, that is, the records that have not yet been restored.
>> This is actually the metric I was attempting to implement in the
>> StateRestoreListener that bumped me up against KAFKA-10575 :-)
>>
>> With both a "restore-records-total" and a "restore-remaining-total" (or
>> similar) metric, it's possible to derive useful information like the
>> estimated time remaining for restoration (by dividing the remaining total
>> by the restoration rate).
>>
>> Regards,
>>
>> Nick
>>
>> On Mon, 19 Sept 2022 at 19:57, Guozhang Wang  wrote:
>>
>> > Hello Bruno,
>> >
>> > Thanks for your comments!
>> >
>> > 1. Regarding the metrics group name: originally I put
>> > "stream-state-metrics" as it's related to state store restorations, but
>> > after a second thought I think I agree with you that this is quite
>> > confusing and not right. About the metrics groups, right now I have two
>> > ideas:
>> >
>> > a) Still use the metric group name "stream-thread-metrics", but
>> > differentiate with the processing threads on the thread id. The pros is
>> > that we do not introduce a new group, the cons is that users who want to
>> > separate processing from restoration/updating in the future needs to do
>> > that on the thread id labels.
>> > b) Introduce a new group name, for example
>> "stream-state-updater-metrics"
>> > and still have a thread-id label. We would be introducing a new group
>> which
>> > still have a thread-id, which may sound a bit weird (maybe if we do
>> that we
>> > could change the existing "stream-thread-metrics" into
>> > "stream-processor-metrics").
>> >
>> > Right now I'm leaning towards b) and maybe in the future rename
>> > "thread-metrics" to "processor-metrics", LMK what do you think.
>> >
>> > 2. Regarding the metric names: today we may also pause a standby tasks,
>> and
>> > hence I'm trying to differentiate updating standbys from paused
>> standbys.
>> > Right now I'm thinking we can change "restoring-standby-tasks" to
>> > "updating-standby-tasks", and change all other metrics' "restore"
>> (except
>> > the "restoring-active-tasks") to "state-update", a.k.a
>> > "state-update-ratio", "state-update-records-total",
>> > "updating-standby-tasks" etc.
>> >
>> > 3. Regarding the function name: yeah I think that's a valid concern, I
>> can
>> > change it to "onRestoreSuspended" since "Aborted" may confuse people
>> that
>> > previously called "onBatchRestored" are undone as part of the abortion.
>> >
>> >
>> > Guozhang
>> >
>> &g

Re: [DISCUSS] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-20 Thread Guozhang Wang
In this case, could you update the KIP to clarify the allocation savings
more clearly in the "Motivation" section? Also you could mention that for
user customizable serdes, if they could provide overwrites on the
overloaded function that's also possible for optimize memory allocations.

Guozhang

On Tue, Sep 20, 2022 at 10:24 AM Guozhang Wang  wrote:

> 1. Ack, thanks.
> 2. Sounds good, thanks for clarifying.
>
> On Tue, Sep 20, 2022 at 9:50 AM ShunKang Lin 
> wrote:
>
>> Hi Guozhang,
>>
>> Thanks for your comments!
>>
>> 1. We can reduce memory allocation if the key/value types happen to be
>> ByteBuffer or String.
>> 2. I would like to add `default ByteBuffer serializeToByteBuffer(String
>> topic, Headers headers, T data)` in Serializer to reduce memory copy in
>> `KafkaProducer#doSend(ProducerRecord, Callback)`, but this change is a bit
>> big, I prefer to submit another one KIP to do the job.
>>
>> Thanks.
>> ShunKang
>>
>> Guozhang Wang  于2022年9月20日周二 06:32写道:
>>
>> > Hello ShunKang,
>> >
>> > Thanks for filing the proposal, and sorry for the late reply!
>> >
>> > I looked over your KIP proposal and the PR, in general I think I agree
>> that
>> > adding an overloaded function with `ByteBuffer` param is beneficial,
>> but I
>> > have a meta question regarding it's impact on Kafka consumer: my
>> > understanding from your PR is that, we can only save memory allocations
>> if
>> > the key/value types happen to be ByteBuffer as well, otherwise we would
>> > still do the `return deserialize(topic, headers, Utils.toArray(data));`
>> > from default impls unless the user customized deserializers is
>> augmented to
>> > handle ByteBuffer directly, right?
>> >
>> >
>> > Guozhang
>> >
>> >
>> >
>> > On Sun, Aug 21, 2022 at 9:56 AM ShunKang Lin > >
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > I'd like to start a discussion on KIP-863 which is Reduce
>> > > Fetcher#parseRecord() memory copy. This KIP can reduce Kafka Consumer
>> > > memory allocation by nearly 50% during fetch records.
>> > >
>> > > Please check
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
>> > > and https://github.com/apache/kafka/pull/12545 for more details.
>> > >
>> > > Any feedbacks and comments are welcomed.
>> > >
>> > > Thanks.
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-20 Thread Guozhang Wang
1. Ack, thanks.
2. Sounds good, thanks for clarifying.

On Tue, Sep 20, 2022 at 9:50 AM ShunKang Lin 
wrote:

> Hi Guozhang,
>
> Thanks for your comments!
>
> 1. We can reduce memory allocation if the key/value types happen to be
> ByteBuffer or String.
> 2. I would like to add `default ByteBuffer serializeToByteBuffer(String
> topic, Headers headers, T data)` in Serializer to reduce memory copy in
> `KafkaProducer#doSend(ProducerRecord, Callback)`, but this change is a bit
> big, I prefer to submit another one KIP to do the job.
>
> Thanks.
> ShunKang
>
> Guozhang Wang  于2022年9月20日周二 06:32写道:
>
> > Hello ShunKang,
> >
> > Thanks for filing the proposal, and sorry for the late reply!
> >
> > I looked over your KIP proposal and the PR, in general I think I agree
> that
> > adding an overloaded function with `ByteBuffer` param is beneficial, but
> I
> > have a meta question regarding it's impact on Kafka consumer: my
> > understanding from your PR is that, we can only save memory allocations
> if
> > the key/value types happen to be ByteBuffer as well, otherwise we would
> > still do the `return deserialize(topic, headers, Utils.toArray(data));`
> > from default impls unless the user customized deserializers is augmented
> to
> > handle ByteBuffer directly, right?
> >
> >
> > Guozhang
> >
> >
> >
> > On Sun, Aug 21, 2022 at 9:56 AM ShunKang Lin 
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion on KIP-863 which is Reduce
> > > Fetcher#parseRecord() memory copy. This KIP can reduce Kafka Consumer
> > > memory allocation by nearly 50% during fetch records.
> > >
> > > Please check
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
> > > and https://github.com/apache/kafka/pull/12545 for more details.
> > >
> > > Any feedbacks and comments are welcomed.
> > >
> > > Thanks.
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-869: Improve Streams State Restoration Visibility

2022-09-20 Thread Guozhang Wang
Hi Nick,

Thanks for the reviews, and I think these are good suggestions. Note that
currently the `restore-records-total/rate` would include both restoring
active tasks as well as updating standby tasks, I think for your purposes
you'd be more interested in active restoring tasks since they could
complete, while updating standby tasks would not complete even if they have
caught up with the active. At the same time, the changelog reader would
only be restoring either active or standby at a given time, and active
tasks has a higher priority such that as long as there is at least one
active task still restoring, we would not restore any standby tasks. From
your suggestion, I'm thinking that maybe I should break up the rate / total
metric for active and standby tasks separately.

For deriving estimated time remaining though, the `total` metric may not be
helpful since they will not be "reset" after rebalances, i.e. they will be
an ever-increasing number and record the total number of records for the
lifetime of the app. But still, just the remaining records alone, with the
time elapsed monitored by the apps, should be sufficient to get the
estimated time remaining.


Guozhang


On Tue, Sep 20, 2022 at 3:10 AM Nick Telford  wrote:

> Hi Guozhang,
>
> KIP looks great, I have one suggestion: in addition to
> "restore-records-total", it would also be useful to track the number of
> records *remaining*, that is, the records that have not yet been restored.
> This is actually the metric I was attempting to implement in the
> StateRestoreListener that bumped me up against KAFKA-10575 :-)
>
> With both a "restore-records-total" and a "restore-remaining-total" (or
> similar) metric, it's possible to derive useful information like the
> estimated time remaining for restoration (by dividing the remaining total
> by the restoration rate).
>
> Regards,
>
> Nick
>
> On Mon, 19 Sept 2022 at 19:57, Guozhang Wang  wrote:
>
> > Hello Bruno,
> >
> > Thanks for your comments!
> >
> > 1. Regarding the metrics group name: originally I put
> > "stream-state-metrics" as it's related to state store restorations, but
> > after a second thought I think I agree with you that this is quite
> > confusing and not right. About the metrics groups, right now I have two
> > ideas:
> >
> > a) Still use the metric group name "stream-thread-metrics", but
> > differentiate with the processing threads on the thread id. The pros is
> > that we do not introduce a new group, the cons is that users who want to
> > separate processing from restoration/updating in the future needs to do
> > that on the thread id labels.
> > b) Introduce a new group name, for example "stream-state-updater-metrics"
> > and still have a thread-id label. We would be introducing a new group
> which
> > still have a thread-id, which may sound a bit weird (maybe if we do that
> we
> > could change the existing "stream-thread-metrics" into
> > "stream-processor-metrics").
> >
> > Right now I'm leaning towards b) and maybe in the future rename
> > "thread-metrics" to "processor-metrics", LMK what do you think.
> >
> > 2. Regarding the metric names: today we may also pause a standby tasks,
> and
> > hence I'm trying to differentiate updating standbys from paused standbys.
> > Right now I'm thinking we can change "restoring-standby-tasks" to
> > "updating-standby-tasks", and change all other metrics' "restore" (except
> > the "restoring-active-tasks") to "state-update", a.k.a
> > "state-update-ratio", "state-update-records-total",
> > "updating-standby-tasks" etc.
> >
> > 3. Regarding the function name: yeah I think that's a valid concern, I
> can
> > change it to "onRestoreSuspended" since "Aborted" may confuse people that
> > previously called "onBatchRestored" are undone as part of the abortion.
> >
> >
> > Guozhang
> >
> >
> >
> > On Mon, Sep 19, 2022 at 10:47 AM Bruno Cadonna 
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for the KIP! I think this KIP is a really nice addition to
> better
> > > understand what is going on in a Kafka Streams application.
> > >
> > > 1.
> > > The metric names "paused-active-tasks" and "paused-standby-tasks" might
> > > be a bit confusing since at least active tasks can be paused also
> > > outside of restoration.
> > >
> > > 2.
> > > Why is the type of the metrics "st

Re: [DISCUSS] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-19 Thread Guozhang Wang
A separate question regarding the proposed API as well: what do you think
to also augment the serializers with ByteBuffer return type in order to be
symmetric with deserializers?



On Mon, Sep 19, 2022 at 3:32 PM Guozhang Wang  wrote:

> Hello ShunKang,
>
> Thanks for filing the proposal, and sorry for the late reply!
>
> I looked over your KIP proposal and the PR, in general I think I agree
> that adding an overloaded function with `ByteBuffer` param is beneficial,
> but I have a meta question regarding it's impact on Kafka consumer: my
> understanding from your PR is that, we can only save memory allocations if
> the key/value types happen to be ByteBuffer as well, otherwise we would
> still do the `return deserialize(topic, headers, Utils.toArray(data));`
> from default impls unless the user customized deserializers is augmented to
> handle ByteBuffer directly, right?
>
>
> Guozhang
>
>
>
> On Sun, Aug 21, 2022 at 9:56 AM ShunKang Lin 
> wrote:
>
>> Hi all,
>>
>> I'd like to start a discussion on KIP-863 which is Reduce
>> Fetcher#parseRecord() memory copy. This KIP can reduce Kafka Consumer
>> memory allocation by nearly 50% during fetch records.
>>
>> Please check
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
>> and https://github.com/apache/kafka/pull/12545 for more details.
>>
>> Any feedbacks and comments are welcomed.
>>
>> Thanks.
>>
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-863: Reduce Fetcher#parseRecord() memory copy

2022-09-19 Thread Guozhang Wang
Hello ShunKang,

Thanks for filing the proposal, and sorry for the late reply!

I looked over your KIP proposal and the PR, in general I think I agree that
adding an overloaded function with `ByteBuffer` param is beneficial, but I
have a meta question regarding it's impact on Kafka consumer: my
understanding from your PR is that, we can only save memory allocations if
the key/value types happen to be ByteBuffer as well, otherwise we would
still do the `return deserialize(topic, headers, Utils.toArray(data));`
from default impls unless the user customized deserializers is augmented to
handle ByteBuffer directly, right?


Guozhang



On Sun, Aug 21, 2022 at 9:56 AM ShunKang Lin 
wrote:

> Hi all,
>
> I'd like to start a discussion on KIP-863 which is Reduce
> Fetcher#parseRecord() memory copy. This KIP can reduce Kafka Consumer
> memory allocation by nearly 50% during fetch records.
>
> Please check
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=225152035
> and https://github.com/apache/kafka/pull/12545 for more details.
>
> Any feedbacks and comments are welcomed.
>
> Thanks.
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-869: Improve Streams State Restoration Visibility

2022-09-19 Thread Guozhang Wang
Hello Bruno,

Thanks for your comments!

1. Regarding the metrics group name: originally I put
"stream-state-metrics" as it's related to state store restorations, but
after a second thought I think I agree with you that this is quite
confusing and not right. About the metrics groups, right now I have two
ideas:

a) Still use the metric group name "stream-thread-metrics", but
differentiate with the processing threads on the thread id. The pros is
that we do not introduce a new group, the cons is that users who want to
separate processing from restoration/updating in the future needs to do
that on the thread id labels.
b) Introduce a new group name, for example "stream-state-updater-metrics"
and still have a thread-id label. We would be introducing a new group which
still have a thread-id, which may sound a bit weird (maybe if we do that we
could change the existing "stream-thread-metrics" into
"stream-processor-metrics").

Right now I'm leaning towards b) and maybe in the future rename
"thread-metrics" to "processor-metrics", LMK what do you think.

2. Regarding the metric names: today we may also pause a standby tasks, and
hence I'm trying to differentiate updating standbys from paused standbys.
Right now I'm thinking we can change "restoring-standby-tasks" to
"updating-standby-tasks", and change all other metrics' "restore" (except
the "restoring-active-tasks") to "state-update", a.k.a
"state-update-ratio", "state-update-records-total",
"updating-standby-tasks" etc.

3. Regarding the function name: yeah I think that's a valid concern, I can
change it to "onRestoreSuspended" since "Aborted" may confuse people that
previously called "onBatchRestored" are undone as part of the abortion.


Guozhang



On Mon, Sep 19, 2022 at 10:47 AM Bruno Cadonna  wrote:

> Hi Guozhang,
>
> Thanks for the KIP! I think this KIP is a really nice addition to better
> understand what is going on in a Kafka Streams application.
>
> 1.
> The metric names "paused-active-tasks" and "paused-standby-tasks" might
> be a bit confusing since at least active tasks can be paused also
> outside of restoration.
>
> 2.
> Why is the type of the metrics "stream-state-metrics"? I would have
> expected "stream-thread-metrics" as the type.
>
> 3.
> Isn't the value of the metric "restoring-standby-tasks" simply the
> number of standby tasks since standby tasks are basically always
> updating (aka restoring)?
>
> 4.
> "idle-ratio", "restore-ratio", and "checkpoint-ratio" seem metrics
> tailored to the upcoming state updater. They do not make much sense with
> a stream thread. Would it be better to introduce a new metrics level
> specifically for the state updater?
>
> 5.
> Personally, I do not like to use the word "restoration" together with
> standbys since restoration somehow implies that there is an offset for
> which the active task is considered restored and active processing can
> start. In other words, restoration is finite. Standby tasks rather
> update continuously their states. They can be up-to-date or lagging. I
> see that you could say "restored" instead of "up-to-date" and "not
> restored" instead of "lagging", but IMO it does not describe well the
> situation. That is a rather minor point. I just wanted to mention it.
>
> 6.
> The name "onRestorePaused()" might be confusing since in Kafka Streams
> users can also pause tasks. What about "onRestoreAborted()" or
> "onRestoreSuspended"?
>
> Best,
> Bruno
>
>
> On 16.09.22 19:33, Guozhang Wang wrote:
> > Hello everyone,
> >
> > I'd like to start a discussion for the following KIP, aiming to improve
> > Kafka Stream's restoration visibility via new metrics and callback
> methods:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility
> >
> >
> > Thanks!
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: KafkaConsumer refactor proposal

2022-09-16 Thread Guozhang Wang
Hi Philip,

On Fri, Sep 16, 2022 at 1:20 PM Philip Nee  wrote:

> Hi Guozhang,
>
> Thank you for the reviews, and I agree with your suggestions: If we find
> any behavior changes, we will issue a KIP. Is it possible to first publish
> the new implementation in a separate class so that we don't disrupt the
> existing implementation? Then we can issue a KIP to address the
> regressions.
>
> Yes! That's what I meant to clarify. Just in case other people may be
confused why we did not propose a KIP right away, but first as a one-pager.


> To your questions:
> 1. I think the coordinator discovery will happen on demand, i.e., the
> background thread will only try to discover a coordinator when the event
> (e.g., commit) requires one. I've noted that, and I'll make sure that in
> the proposal.
>
Thanks. Could you then clarify that for the coordinator state transition?
Is that just between `down` and `initialized`, and then `initialized` can
then transition back to `down` too?


> 2. Acked. I think I'll add a section about how each operation works in the
> new implementation.

3a. I will modify the existing state enum in the new implementation.
> 3b. Ah, I must have missed this when proofreading the document. I think the
> state should be:
>   Unjoined -> Commit_async -> Revoking_Partitions -> Partitions_Revoked ->
> Join_Group -> Completed_Rebalancing -> Assinging_Partitions ->
> Partitions_Assigned -> Stable
>   I've made a note of that, and I'll update the document.
>
Got it, if we are introducing a third state for auto committing, then upon
completing the rebalance, we may also need to transit to that state since
we may also revoke partitions, right? This is for fixing KAFKA-14224


> 4. Noted. I'll add a section about exception handling.
> 5a.  Kirk also had the same comment. I updated the document.
> 5b. yes!
>
> Regarding the POC comments, I think the POC branch is actually quite dated.
> I need to update it . Do you think we can start with simple PRs like
> skeletons of ConsumerBackgroundThread.java and EventHandler interface and
> implementations?
>
> Yes, I think as long as we are introducing new classes / interfaces the
internal package, we can incrementally merge PRs even if they are not
integrated with the public KafkaConsumer class yet.


> 1. I agree the naming can be confusing, but I think the usage of response
> and request has implications for the type of event. As you said, BT -> PT
> handles error and callback events, and PT -> BT handles mostly API calls.
> I'll think again about these naming.
> 2. Great suggestions.
> 3. I think I'll start with defining a few basic events, like assign,
> commits, and fetch. Then we can modify that as we progress with the
> project.
>
> Thanks,
> P
>
>
> On Thu, Sep 15, 2022 at 3:02 PM Guozhang Wang  wrote:
>
> > Hello Philip,
> >
> > Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
> > this as a 1-pager instead of a KIP is that so far all the implementations
> > should be internal, and hence not impacting anything on the public APIs.
> > If, e.g. we found along with the implementation that some public
> interfaces
> > like metrics, need to be modified, then we will send out a separate
> thread
> > for that in a KIP proposal, right?
> >
> > I made a pass on the 1-pager and also some of the ongoing developments,
> > here are just some high-level comments:
> >
> > On the 1-pager:
> >
> > 1) I think it's better to clarify the scenarios under manual assignment
> > a.k.a. `consumer.assign()`. E.g. does the background thread still always
> > tries to discover coordinator even if there's no commit requests (note
> > that, if yes, this would be a behavioral change since some users like
> > Connect may rely on the consumer to not ever try to discover the
> > coordinator with manual assignment and no commit to brokers)? From the
> > description it seems if there's no events from the channel to request
> > committing offsets, it would not try to discover coordinator, but then
> the
> > background thread's state would would be in `down` and `initialized`, not
> > in `stable`, and hence we should also allow transition from `initialized`
> > to `down` directly, right?
> >
> > 2) From the polling thread, besides the `poll` function changes, I think
> > it's better to also state other blocking function changes like commitSync
> > as well. I'd assume e.g. for commitSync it would be implemented as:
> >
> > * Send the commit-request to the server-event channel
> > * Continue polling from the consumer event channel, but skip other events
> > like rebalance-listener (we 

[DISCUSS] KIP-869: Improve Streams State Restoration Visibility

2022-09-16 Thread Guozhang Wang
Hello everyone,

I'd like to start a discussion for the following KIP, aiming to improve
Kafka Stream's restoration visibility via new metrics and callback methods:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility


Thanks!
-- Guozhang


[jira] [Created] (KAFKA-14239) Merge StateRestorationIntegrationTest into RestoreIntegrationTest

2022-09-16 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-14239:
-

 Summary: Merge StateRestorationIntegrationTest into 
RestoreIntegrationTest
 Key: KAFKA-14239
 URL: https://issues.apache.org/jira/browse/KAFKA-14239
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Guozhang Wang


We have two integration test classes for store restoration, and 
StateRestorationIntegrationTest only has one single test method. We can merge 
it with the other to save integration testing time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Request for contributor permissions

2022-09-16 Thread Guozhang Wang
Hello Akshay,

Thanks for your interest! You should have the permissions now.

Guozhang

On Fri, Sep 16, 2022 at 7:40 AM Akshay Bhadange 
wrote:

> Hello sir/madam ,
>
> I am really interested in this project and want to contribute.so please
> provide me the contributor permission.my jira id is
> (akshaybhadange21).thanks for your help
>


-- 
-- Guozhang


Re: KafkaConsumer refactor proposal

2022-09-15 Thread Guozhang Wang
Hello Philip,

Thanks for writing down the 1-pager. Just to clarify, the reason we wrote
this as a 1-pager instead of a KIP is that so far all the implementations
should be internal, and hence not impacting anything on the public APIs.
If, e.g. we found along with the implementation that some public interfaces
like metrics, need to be modified, then we will send out a separate thread
for that in a KIP proposal, right?

I made a pass on the 1-pager and also some of the ongoing developments,
here are just some high-level comments:

On the 1-pager:

1) I think it's better to clarify the scenarios under manual assignment
a.k.a. `consumer.assign()`. E.g. does the background thread still always
tries to discover coordinator even if there's no commit requests (note
that, if yes, this would be a behavioral change since some users like
Connect may rely on the consumer to not ever try to discover the
coordinator with manual assignment and no commit to brokers)? From the
description it seems if there's no events from the channel to request
committing offsets, it would not try to discover coordinator, but then the
background thread's state would would be in `down` and `initialized`, not
in `stable`, and hence we should also allow transition from `initialized`
to `down` directly, right?

2) From the polling thread, besides the `poll` function changes, I think
it's better to also state other blocking function changes like commitSync
as well. I'd assume e.g. for commitSync it would be implemented as:

* Send the commit-request to the server-event channel
* Continue polling from the consumer event channel, but skip other events
like rebalance-listener (we still need to bookkeep it for the next `poll`
call, but we just cannot trigger them since that breaks compatibility)
until we received the commit response event.

Some details about how those functions would be implemented would also be
very helpful for the community's audience.

3) I have a few questions from the rebalance state machine section:

3.a). you mentioned in the state machine:

"Wait for the partition revoked event, and advance the state to
PARTITIONS_REVOKED"
"Wait for partition assignment completion from the polling thread.  Advance
to PARTITIONS_ASSIGNED"

But we do not have those states today, I think you meant to say
"PREPARE_REBALANCING" and "STABLE"?

3.b). Also, it seems that the proposed state transition would be Stable ->
Revoking_Partitions -> Prepare_Rebalancing -> Complete_Rebalancing ->
Assigning_Partitions -> Stable (for eager protocol at least), but when auto
commit is enabled, we also need to commit offsets for those revoking
partitions, and the auto-committing happens before the
`onPartitionsRevoked` listener is triggered, so should auto-committing be
part of the `Revoking_Partitions` state as well?

4) Could you expand on the "Exceptions thrown will be different."
description and list all changes to the exceptions (note that, if they do
exist, we also need a KIP). For example, besides WakeupException, Kafka
also have a InterruptException (different from Java's own
InterruptedException) defined on the public APIs, are we going to change
which functions would throw and which will not?

5) In the testing plan:

5.a) Could you elaborate a bit more on "We need to make sure the timing of
the 1.  coordinator discovery and 2.  joinGroup operations are being done
in the correct timing."

5.b) I'd also add that for all the blocking APIs including `poll` where a
timeout value is provided either as param, or implicitly from `
default.api.timeout.ms` they would now be strictly respected --- to me this
is also one of the key motivations of this refactoring :)

-

And the current POC PRs:

1. Just a quick thought about the naming of "KafkaServerEventQueue": I
think I also agree with others that it may be confusing to include
`KafkaServer` on a client class, what about renaming it to
"ConsumerRequestEventQueue` and `ConsumerResponseEventQueue`? I know that
it sounds a bit awkward to have the `ResponseEventQueue` to also return
rebalance-listener-triggering events, but that may be less confusing.

2. I'd suggest for new modules like `ConsumerBackgroundThread`, we first
defines an interface in the `internals` package, e.g. `RequestEventHandler`
(assuming the previous rename suggestion), and then have a
`DefaultRequestEventHandler` implementation class which encapsulate the
background thread. This enables us to easily write unit tests that isolate
other modules especially with concurrent threadings.

3. For `KafkaServerEventType`: where would NOOP being used? Also I think
there are other types, like FETCH_COMMITTED as well?



Thanks,
Guozhang


On Tue, Sep 13, 2022 at 2:14 PM Philip Nee  wrote:

> Hi all,
>
> Here is the proposal to refactor the Kafka Consumer
> <
> https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
> >.
> The 1-pager is on the wiki, so please take a look at it.  Also, 

Re: [VOTE] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-14 Thread Guozhang Wang
Thanks David,

There are a few minor comments pending in the discussion thread, and one is
about whether we should merge PreparePartitionAssignment with HB. But I
think the KIP itself is in pretty good shape now. Thanks!


Guozhang

On Fri, Sep 9, 2022 at 1:32 AM David Jacot 
wrote:

> Hi all,
>
> Thank you all for the very positive discussion about KIP-848. It looks
> like folks are very positive about it overall.
>
> I would like to start a vote on KIP-848, which introduces a brand new
> consumer rebalance protocol.
>
> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
>
> Best,
> David
>


-- 
-- Guozhang


Re: [VOTE] KIP-862: Self-join optimization for stream-stream joins

2022-09-13 Thread Guozhang Wang
Thank Vicky! I'm +1.

Guozhang

On Mon, Sep 12, 2022 at 7:02 PM John Roesler  wrote:

> Thanks for the updates, Vicky!
>
> I've reviewed the KIP and your POC PR,
> and I'm +1 (binding).
>
> Thanks!
> -John
>
> On Mon, Sep 12, 2022, at 09:13, Vasiliki Papavasileiou wrote:
> > Hey Guozhang,
> >
> > Great suggestion, I made the change.
> >
> > Best,
> > Vicky
> >
> > On Fri, Sep 9, 2022 at 10:43 PM Guozhang Wang 
> wrote:
> >
> >> Thanks Vicky, that reads much clearer now.
> >>
> >> Just regarding the value string name itself: "self.join" may be
> confusing
> >> compared to other values that people would think before this config is
> >> enabled, self-join are not allowed at all. Maybe we can rename it to
> >> "single.store.self.join"?
> >>
> >> Guozhang
> >>
> >> On Fri, Sep 9, 2022 at 2:15 AM Vasiliki Papavasileiou
> >>  wrote:
> >>
> >> > Hey Guozhang,
> >> >
> >> > Ah it seems my text was not very clear :)
> >> > With "TOPOLOGY_OPTIMIZATION_CONFIG will be extended to accept a list
> of
> >> > optimization rule configs" I meant that it will accept the new value
> >> > strings for each optimization rule. Let me rephrase that in the KIP to
> >> make
> >> > it clearer.
> >> > Is it better now?
> >> >
> >> > Best,
> >> > Vicky
> >> >
> >> > On Thu, Sep 8, 2022 at 9:07 PM Guozhang Wang 
> wrote:
> >> >
> >> > > Thanks Vicky,
> >> > >
> >> > > I read through the KIP again and it looks good to me. Just a quick
> >> > question
> >> > > regarding the public config changes: you mentioned "No public
> >> interfaces
> >> > > will be impacted. The config TOPOLOGY_OPTIMIZATION_CONFIG will be
> >> > extended
> >> > > to accept a list of optimization rule configs in addition to the
> global
> >> > > values "all" and "none" . But there are no new value strings
> mentioned
> >> in
> >> > > this KIP, so that means we will apply this optimization only when
> `all`
> >> > is
> >> > > specified in the config right?
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Thu, Sep 8, 2022 at 12:02 PM Vasiliki Papavasileiou
> >> > >  wrote:
> >> > >
> >> > > > Hello everyone,
> >> > > >
> >> > > > I'd like to open the vote for KIP-862, which proposes to optimize
> >> > > > stream-stream self-joins by using a single state store for the
> join.
> >> > > >
> >> > > > The proposal is here:
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins
> >> > > >
> >> > > > Thanks to all who reviewed the proposal, and thanks in advance for
> >> > taking
> >> > > > the time to vote!
> >> > > >
> >> > > > Thank you,
> >> > > > Vicky
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>


-- 
-- Guozhang


Re: [VOTE] KIP-862: Self-join optimization for stream-stream joins

2022-09-09 Thread Guozhang Wang
Thanks Vicky, that reads much clearer now.

Just regarding the value string name itself: "self.join" may be confusing
compared to other values that people would think before this config is
enabled, self-join are not allowed at all. Maybe we can rename it to
"single.store.self.join"?

Guozhang

On Fri, Sep 9, 2022 at 2:15 AM Vasiliki Papavasileiou
 wrote:

> Hey Guozhang,
>
> Ah it seems my text was not very clear :)
> With "TOPOLOGY_OPTIMIZATION_CONFIG will be extended to accept a list of
> optimization rule configs" I meant that it will accept the new value
> strings for each optimization rule. Let me rephrase that in the KIP to make
> it clearer.
> Is it better now?
>
> Best,
> Vicky
>
> On Thu, Sep 8, 2022 at 9:07 PM Guozhang Wang  wrote:
>
> > Thanks Vicky,
> >
> > I read through the KIP again and it looks good to me. Just a quick
> question
> > regarding the public config changes: you mentioned "No public interfaces
> > will be impacted. The config TOPOLOGY_OPTIMIZATION_CONFIG will be
> extended
> > to accept a list of optimization rule configs in addition to the global
> > values "all" and "none" . But there are no new value strings mentioned in
> > this KIP, so that means we will apply this optimization only when `all`
> is
> > specified in the config right?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 8, 2022 at 12:02 PM Vasiliki Papavasileiou
> >  wrote:
> >
> > > Hello everyone,
> > >
> > > I'd like to open the vote for KIP-862, which proposes to optimize
> > > stream-stream self-joins by using a single state store for the join.
> > >
> > > The proposal is here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins
> > >
> > > Thanks to all who reviewed the proposal, and thanks in advance for
> taking
> > > the time to vote!
> > >
> > > Thank you,
> > > Vicky
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread Guozhang Wang
Hello David,

Alright I think that's sufficient. Just to make that clear in the doc,
could we update:

1) the heartbeat request handling section, stating when coordinator will
trigger rebalance based on the HB's member metadata / reason?
2) the "Rebalance Triggers" section to include what we described in "Group
Epoch - Trigger a rebalance" section as well?


Guozhang

On Fri, Sep 9, 2022 at 1:28 AM David Jacot 
wrote:

> Hi Guozhang,
>
> I thought that the assignor will always be consulted when the next
> heartbeat request is constructed. In other words,
> `PartitionAssignor#metadata` will be called for every heartbeat. This
> gives the opportunity for the assignor to enforce a rebalance by
> setting the reason to a non-zero value or by changing the bytes. Do
> you think that this is not sufficient? Are you concerned by the delay?
>
> Best,
> David
>
> On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang  wrote:
> >
> > Hello David,
> >
> > One of Jun's comments make me thinking:
> >
> > ```
> > In this case, a new assignment is triggered by the client side
> > assignor. When constructing the HB, the consumer will always consult
> > the client side assignor and propagate the information to the group
> > coordinator. In other words, we don't expect users to call
> > Consumer#enforceRebalance anymore.
> > ```
> >
> > As I looked at the current PartitionAssignor's interface, we actually do
> > not have a way yet to instruct how to construct the next HB request, e.g.
> > when the assignor wants to enforce a new rebalance with a new assignment,
> > we'd need some customizable APIs inside the PartitionAssignor to indicate
> > the next HB telling broker about so. WDYT about adding such an API on the
> > PartitionAssignor?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
> > wrote:
> >
> > > Hi Jun,
> > >
> > > I have updated the KIP to include your feedback. I have also tried to
> > > clarify the parts which were not cleared.
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot 
> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for your feedback. Let me start by answering your questions
> > > > inline and I will update the KIP next week.
> > > >
> > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to
> be
> > > fewer
> > > > > RPCs during rebalance and more efficient support of wildcard. A few
> > > > > comments below.
> > > >
> > > > I would also add that the KIP removes the global sync barrier in the
> > > > protocol which is essential to improve group stability and
> > > > scalability, and the KIP also simplifies the client by moving most of
> > > > the logic to the server side.
> > > >
> > > > > 30. ConsumerGroupHeartbeatRequest
> > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> > > changing
> > > > > of the partition assignor in the consumers?
> > > >
> > > > Definitely. The group coordinator will use the assignor used by a
> > > > majority of the members. This allows the group to move from one
> > > > assignor to another by a roll. This is explained in the Assignor
> > > > Selection chapter.
> > > >
> > > > > 30.2 For each field, could you explain whether it's required in
> every
> > > > > request or the scenarios when it needs to be filled? For example,
> it's
> > > not
> > > > > clear to me when TopicPartitions needs to be filled.
> > > >
> > > > The client is expected to set those fields in case of a connection
> > > > issue (e.g. timeout) or when the fields have changed since the last
> > > > HB. The server populates those fields as long as the member is not
> > > > fully reconciled - the member should acknowledge that it has the
> > > > expected epoch and assignment. I will clarify this in the KIP.
> > > >
> > > > > 31. In the current consumer protocol, the rack affinity between the
> > > client
> > > > > and the broker is only considered during fetching, but not during
> > > assigning
> > > > > partitions to consumers. Sometimes, once the assignment is made,
> there
> > > is
> > > > > no opportunity for read affinity because no replicas of 

Re: [VOTE] KIP-865: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Guozhang Wang
+1. Thanks.

Guozhang

On Fri, Sep 9, 2022 at 9:52 AM Николай Ижиков  wrote:

> Hello.
>
> I'd like to start a vote on KIP-865 which adds support of
> —bootstrap-server parameter in kafka-streams-application-reset tool
>
> Discuss Thread:
> https://lists.apache.org/thread/5c1plw7mgmzd4zzqh1w59cqopn8kv21c
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> JIRA: https://issues.apache.org/jira/browse/KAFKA-12878



-- 
-- Guozhang


Re: [DISCUSSION] KIP-864: Support --bootstrap-server in kafka-streams-application-reset

2022-09-09 Thread Guozhang Wang
Hi, I think we can vote right away.

Guozhang

On Fri, Sep 9, 2022 at 2:24 AM Николай Ижиков  wrote:

> Hello, Guozhang.
>
> Thanks for the feedback.
> As this KIP very straightforward, is it worth to be voted right now?
> Or should we wait more feedback?
>
> > 9 сент. 2022 г., в 08:11, Guozhang Wang  написал(а):
> >
> > Hello Николай,
> >
> > Thanks for writing the KIP, I think it's rather straightforward and
> better
> > to be consistent in tooling params. I'm +1.
> >
> > Guozhang
> >
> >
> > On Mon, Sep 5, 2022 at 11:25 PM Николай Ижиков 
> wrote:
> >
> >> Hello.
> >>
> >> Do we still want to make parameter names consistent in tools?
> >> If yes, please, share your feedback on KIP.
> >>
> >>> 31 авг. 2022 г., в 11:50, Николай Ижиков 
> >> написал(а):
> >>>
> >>> Hello.
> >>>
> >>> I would like to start discussion on small KIP [1]
> >>> The goal of KIP is to add the same —boostrap-server parameter to
> >> `kafka-streams-appliation-reset.sh` tool as other tools use.
> >>> Please, share your feedback.
> >>>
> >>> [1]
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> >>>
> >>
> >>
> >
> > --
> > -- Guozhang
>
>

-- 
-- Guozhang


Re: [DISCUSSION] KIP-864: Support --bootstrap-server in kafka-streams-application-reset

2022-09-08 Thread Guozhang Wang
Hello Николай,

Thanks for writing the KIP, I think it's rather straightforward and better
to be consistent in tooling params. I'm +1.

Guozhang


On Mon, Sep 5, 2022 at 11:25 PM Николай Ижиков  wrote:

> Hello.
>
> Do we still want to make parameter names consistent in tools?
> If yes, please, share your feedback on KIP.
>
> > 31 авг. 2022 г., в 11:50, Николай Ижиков 
> написал(а):
> >
> > Hello.
> >
> > I would like to start discussion on small KIP [1]
> > The goal of KIP is to add the same —boostrap-server parameter to
> `kafka-streams-appliation-reset.sh` tool as other tools use.
> > Please, share your feedback.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-864%3A+Support+--bootstrap-server+in+kafka-streams-application-reset
> >
>
>

-- 
-- Guozhang


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-08 Thread Guozhang Wang
e underlying "engine". The main difference is that
> the
> > > > Consumer
> > > > > > >> > assigns topic-partitions to members whereas Connect assigns
> tasks
> > > > to
> > > > > > >> > workers. I see two ways to move forward:
> > > > > > >> > 1) We extend the new proposed APIs to support different
> resource
> > > > types
> > > > > > >> > (e.g. partitions, tasks, etc.); or
> > > > > > >> > 2) We use new dedicated APIs for Connect. The dedicated APIs
> > > > would be
> > > > > > >> > similar to the new ones but different on the
> content/resources and
> > > > > > >> > they would rely on the same engine on the coordinator side.
> > > > > > >> >
> > > > > > >> > I personally lean towards 2) because I am not a fan of
> > > > overcharging
> > > > > > >> > APIs to serve different purposes. That being said, I am not
> > > > opposed to
> > > > > > >> > 1) if we can find an elegant way to do it.
> > > > > > >> >
> > > > > > >> > I think that we can continue to discuss it here for now in
> order
> > > > to
> > > > > > >> > ensure that this KIP is compatible with what we will do for
> > > > Connect in
> > > > > > >> > the future.
> > > > > > >> >
> > > > > > >> > Best,
> > > > > > >> > David
> > > > > > >> >
> > > > > > >> > On Mon, Aug 8, 2022 at 2:41 PM David Jacot <
> dja...@confluent.io>
> > > > wrote:
> > > > > > >> > >
> > > > > > >> > > Hi all,
> > > > > > >> > >
> > > > > > >> > > I am back from vacation. I will go through and address
> your
> > > > comments
> > > > > > >> > > in the coming days. Thanks for your feedback.
> > > > > > >> > >
> > > > > > >> > > Cheers,
> > > > > > >> > > David
> > > > > > >> > >
> > > > > > >> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris <
> > > > gharris1...@gmail.com
> > > > > > >> >
> > > > > > >> > wrote:
> > > > > > >> > > >
> > > > > > >> > > > Hey All!
> > > > > > >> > > >
> > > > > > >> > > > Thanks for the KIP, it's wonderful to see cooperative
> > > > rebalancing
> > > > > > >> > making it
> > > > > > >> > > > down the stack!
> > > > > > >> > > >
> > > > > > >> > > > I had a few questions:
> > > > > > >> > > >
> > > > > > >> > > > 1. The 'Rejected Alternatives' section describes how
> member
> > > > epoch
> > > > > > >> > should
> > > > > > >> > > > advance in step with the group epoch and assignment
> epoch
> > > > values. I
> > > > > > >> > think
> > > > > > >> > > > that this is a good idea for the reasons described in
> the
> > > > KIP. When
> > > > > > >> the
> > > > > > >> > > > protocol is incrementally assigning partitions to a
> worker,
> > > > what
> > > > > > >> member
> > > > > > >> > > > epoch does each incremental assignment use? Are member
> epochs
> > > > > > >> re-used,
> > > > > > >> > and
> > > > > > >> > > > a single member epoch can correspond to multiple
> different
> > > > > > >> > (monotonically
> > > > > > >> > > > larger) assignments?
> > > > > > >> > > >
> > > > > > >> > > > 2. Is the Assignor's 'Reason' field opaque to the group
> > > > > > >> coordinator? If
> > > > > > >> > > > not, should custom client-side assignor implementations
> > > > interact
> &g

Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-09-08 Thread Guozhang Wang
Hello Nick,

Thanks for the patient explanations. I think I'm convinced that we should
not exclude repartitioning inside the recursive operator indeed, and in
fact we cannot programmingly forbid it by the UnaryOperator itself anyways.

Just a few nits on the page itself:

1) could you still include in the example code a manual repartitioning step
inside the recursive step? It is to illustrate that as long as the final
stream key-value types are back to the same as the starting stream, we can
include arbitrary intermediate topics in the middle.

2) Regarding that the unary operator has to be terminated: I saw in some
cases of iterative streaming, where the iterations do not necessarily
require termination, and instead it relies on the follow-up step (e.g. like
a windowding aggregation) to emit final results. So I'm wondering if the
requirement of the operator to be terminating -- we cannot enforce it
programmingly though -- is just to make sure that we do not getting
exploded processing space (which seems like an implementation detail not
necessarily have to be exposed and required to users), or does it have any
correlations to processing semantics?

Could we, e.g. add another code example similar to this one:
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java#L87-L117

The main purpose is just to make sure and illustrate that the proposed API
could cover various cases with both terminating and potentially
non-terminating operators?


Other than that, I do not have any further comments. Great job!

Guozhang


On Tue, Sep 6, 2022 at 10:25 AM Nick Telford  wrote:

> The more I think about this, the more I think that automatic repartitioning
> is not required in the "recursively" method itself. I've removed references
> to this from the KIP, which further simplifies everything.
>
> I don't see any need to restrict users from repartitioning, either before,
> after or inside the "recursively" method. I can't see a scenario where the
> recursion would cause problems with it.
>
> Nick
>
> On Tue, 6 Sept 2022 at 18:08, Nick Telford  wrote:
>
> > Hi Guozhang,
> >
> > I mentioned this in the "Rejected Alternatives" section. Repartitioning
> > gives us several significant advantages over using an explicit topic and
> > "to":
> >
> >- Repartition topics are automatically created and managed by the
> >Streams runtime; explicit topics have to be created and managed by
> the user.
> >- Repartitioning topics have no retention criteria and instead purge
> >records once consumed, this prevents data loss. Explicit topics need
> >retention criteria, which have to be set large enough to avoid data
> loss,
> >often wasting considerable resources.
> >- The "recursively" method requires significantly less code than
> >recursion via an explicit topic, and is significantly easier to
> understand.
> >
> > Ultimately, I don't think repartitioning inside the unary operator adds
> > much complexity to the implementation. Certainly no more than other DSL
> > operations.
> >
> > Regards,
> > Nick
> >
> > On Tue, 6 Sept 2022 at 17:28, Guozhang Wang  wrote:
> >
> >> Hello Nick,
> >>
> >> Thanks for the re-written KIP! I read through it again, and so far have
> >> just one quick question on top of my head regarding repartitioning: it
> >> seems to me that when there's an intermediate topic inside the recursion
> >> step, then using this new API would basically give us the same behavior
> as
> >> using the existing `to` APIs. Of course, with the new API the user can
> >> make
> >> it more explicit that it is supposed to be recursive, but efficiency
> wise
> >> it provides no further optimizations. Is my understanding correct? If
> yes,
> >> I'm wondering if it's worthy the complexity to allow repartitioning
> inside
> >> the unary operator, or should we just restrict the recursion inside a
> >> single sub-topology.
> >>
> >>
> >> Guozhang
> >>
> >> On Tue, Sep 6, 2022 at 9:05 AM Nick Telford 
> >> wrote:
> >>
> >> > Hi everyone,
> >> >
> >> > I've re-written the KIP, with a new design that I think resolves the
> >> issues
> >> > you highlighted, and also simplifies usage.
> >> >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
> >> >
> >> > Note: I'm still workin

Re: [VOTE] KIP-862: Self-join optimization for stream-stream joins

2022-09-08 Thread Guozhang Wang
Thanks Vicky,

I read through the KIP again and it looks good to me. Just a quick question
regarding the public config changes: you mentioned "No public interfaces
will be impacted. The config TOPOLOGY_OPTIMIZATION_CONFIG will be extended
to accept a list of optimization rule configs in addition to the global
values "all" and "none" . But there are no new value strings mentioned in
this KIP, so that means we will apply this optimization only when `all` is
specified in the config right?


Guozhang


On Thu, Sep 8, 2022 at 12:02 PM Vasiliki Papavasileiou
 wrote:

> Hello everyone,
>
> I'd like to open the vote for KIP-862, which proposes to optimize
> stream-stream self-joins by using a single state store for the join.
>
> The proposal is here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins
>
> Thanks to all who reviewed the proposal, and thanks in advance for taking
> the time to vote!
>
> Thank you,
> Vicky
>


-- 
-- Guozhang


Re: Last sprint to finish line: Replace EasyMock/Powermock with Mockito

2022-09-08 Thread Guozhang Wang
Thanks Christo for the updates. As we are adding new unit tests we are also
keen on using the new Mockito packages and so far I'd like to say it's much
easier to use :) would chime in to help on reviewing some of the PRs as
well.


Guozhang

On Tue, Sep 6, 2022 at 11:02 PM Christo Lolov
 wrote:

> Hello!
>
> This is the (roughly) bi-weekly update on the Mockito migration.
>
> Firstly, the following PRs have been merged since the last email so thank
> you to the writers (Yash and Divij) and reviewers (Dalibor, Mickael, Yash,
> Bruno and Chris):
>
> https://github.com/apache/kafka/pull/12459 <
> https://github.com/apache/kafka/pull/12459>
> https://github.com/apache/kafka/pull/12473 <
> https://github.com/apache/kafka/pull/12473>
> https://github.com/apache/kafka/pull/12509 <
> https://github.com/apache/kafka/pull/12509>
>
> Secondly, this is the latest list of PRs that are in need of a review to
> get them over the line:
>
> https://github.com/apache/kafka/pull/12409 <
> https://github.com/apache/kafka/pull/12409>
> https://github.com/apache/kafka/pull/12418 <
> https://github.com/apache/kafka/pull/12418> (I need to respond to the
> comments on this one, so the first action is on me)
> https://github.com/apache/kafka/pull/12465 <
> https://github.com/apache/kafka/pull/12465>
> https://github.com/apache/kafka/pull/12492 <
> https://github.com/apache/kafka/pull/12492>
> https://github.com/apache/kafka/pull/12505 <
> https://github.com/apache/kafka/pull/12505> (I need to respond to
> Dalibor’s comment on this one, but the overall PR could use some more eyes)
> https://github.com/apache/kafka/pull/12524 <
> https://github.com/apache/kafka/pull/12524>
> https://github.com/apache/kafka/pull/12527 <
> https://github.com/apache/kafka/pull/12527>
>
> Lastly, I am keeping https://issues.apache.org/jira/browse/KAFKA-14133 <
> https://issues.apache.org/jira/browse/KAFKA-14133> and
> https://issues.apache.org/jira/browse/KAFKA-14132 <
> https://issues.apache.org/jira/browse/KAFKA-14132> up to date, so if
> anyone has spare bandwidth and would like to assign themselves some of the
> unassigned tests their contributions would be welcome :)
>
> Best,
> Christo



-- 
-- Guozhang


Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams

2022-09-06 Thread Guozhang Wang
e this method take a parameter,
> > explicitly specifying the KStream that records are fed back in to, making
> > the above two examples:
> >
> > updates
> > .map((parent, count) -> KeyValue(parent, count + 1))
> > .join(parents, (count, parent) -> { KeyValue(parent, count) })
> > .recursively(updates)
> >
> > and:
> >
> > foo
> > .filter((key, value) -> value <= 0)
> > .mapValues((value) -> value - 1)
> > .recursively(foo)
> >
> > We could *also* support a 0-ary version of the method that defaults to
> > recursively executing the previous node, but I'm worried that users may
> not
> > fully understand the consequences of this, inadvertently creating
> infinite
> > loops that are difficult to debug.
> >
> > Finally, I'm not convinced that "recursively" is the best name for the
> > method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome!
> >
> > If we want to prevent this method being "abused" to merge different
> > streams together, it should be trivial to ensure that the provided
> argument
> > is an ancestor of the current node, by recursively traversing up the
> > process graph.
> >
> > I hope this clarifies your questions. It's clear that the KIP needs more
> > work to better elaborate on these points. I haven't had a chance to
> revise
> > it yet, due to more pressing issues with EOS stability that I've been
> > looking into.
> >
> > Regards,
> >
> > Nick
> >
> > On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman
> >  wrote:
> >
> >> Hey Nick,
> >>
> >> Sounds like an interesting KIP, and I agree the current way of achieving
> >> this in Streams
> >> seems wildly overcomplicated. So I'm definitely +1 on adding a smooth
> API
> >> that abstracts
> >> away a lot of the complexity and unnecessary topic management.
> >>
> >> That said, I've found much of the discussion so far on the API itself to
> >> be
> >> very confusing -- for example, I don't understand this point:
> >>
> >>  I actually considered a "recursion" API, something
> >> > like you suggested, however it won't work, because to do the recursion
> >> you
> >> > need to know both the end of the KStream that you want to recurse, AND
> >> the
> >> > beginning of the stream you want to feed it back into.
> >>
> >>
> >> As I see it, the internal implementation should be, and is, essentially
> >> independent from the
> >> design of the API itself -- in other words, why does calling this
> >> operator/method `recursion`
> >> not work, or have anything at all to do with what Streams "knows" or how
> >> it
> >> does the actual
> >> recursion? And why would calling it recursion be any different from
> >> calling
> >> it/reusing the existing
> >> `to` operator method?
> >>
> >> On that note, the proposal to reuse the `to` operator for this purpose
> is
> >> the other thing I've found
> >> to be very confusing. Can you expand on why you think `to` would be
> >> appropriate here vs a
> >> dedicated recursion operator? I actually think it would be fairly
> >> misleading to have the `to` operator
> >> do something pretty wildly different depending on what you passed in, I
> >> mean stream recursion seems
> >> quite far removed from its current semantics -- I just don't really see
> >> the
> >> connection.
> >>
> >> so tl;dr why not give this operation its own dedicated operator/method
> >> name, vs reusing an existing operator that does something else?
> >>
> >> Overall though this sounds great, thanks for the KIP!
> >>
> >> Cheers,
> >> Sophie
> >>
> >> On Thu, Aug 18, 2022 at 4:48 PM Guozhang Wang 
> wrote:
> >>
> >> > Hello Nick,
> >> >
> >> > Thanks for the replies! They are very thoughtful. I think I agree with
> >> you
> >> > that requiring the output stream to a source stream is not sufficient
> >> for a
> >> > valid recursion, and even without the proposed API users today can
> still
> >> > create a broken recursive topology.
> >> >
> >> > Just want to clarify another question:
> >> >
> >> > In our current examples, the linked output stream and input stream are
&g

  1   2   3   4   5   6   7   8   9   10   >